diff --git a/api/v1alpha1/constants.go b/api/v1alpha1/constants.go index 8d4aa9b9..7abc58d8 100644 --- a/api/v1alpha1/constants.go +++ b/api/v1alpha1/constants.go @@ -17,6 +17,8 @@ package v1alpha1 const ( // ConditionReady indicates status condition ready ConditionReady string = "Ready" + // ConditionTopicPolicyReady indicates the topic policy ready + ConditionTopicPolicyReady string = "PolicyReady" // FinalizerName is the finalizer string that add to object FinalizerName string = "cloud.streamnative.io/finalizer" diff --git a/api/v1alpha1/pulsartopic_types.go b/api/v1alpha1/pulsartopic_types.go index 97905aca..29cef96b 100644 --- a/api/v1alpha1/pulsartopic_types.go +++ b/api/v1alpha1/pulsartopic_types.go @@ -137,6 +137,7 @@ type PulsarTopicStatus struct { //+kubebuilder:printcolumn:name="GENERATION",type=string,JSONPath=`.metadata.generation` //+kubebuilder:printcolumn:name="OBSERVED_GENERATION",type=string,JSONPath=`.status.observedGeneration` //+kubebuilder:printcolumn:name="READY",type=string,JSONPath=`.status.conditions[?(@.type=="Ready")].status` +//+kubebuilder:printcolumn:name="POLICY_READY",type=string,JSONPath=`.status.conditions[?(@.type=="PolicyReady")].status` // PulsarTopic is the Schema for the pulsartopics API type PulsarTopic struct { diff --git a/config/crd/bases/resource.streamnative.io_pulsartopics.yaml b/config/crd/bases/resource.streamnative.io_pulsartopics.yaml index 09a384a1..c86fc997 100644 --- a/config/crd/bases/resource.streamnative.io_pulsartopics.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsartopics.yaml @@ -46,6 +46,9 @@ spec: - jsonPath: .status.conditions[?(@.type=="Ready")].status name: READY type: string + - jsonPath: .status.conditions[?(@.type=="PolicyReady")].status + name: POLICY_READY + type: string name: v1alpha1 schema: openAPIV3Schema: diff --git a/pkg/admin/dummy.go b/pkg/admin/dummy.go index 28609821..103775c3 100644 --- a/pkg/admin/dummy.go +++ b/pkg/admin/dummy.go @@ -60,8 +60,8 @@ func (d *DummyPulsarAdmin) SetNamespaceClusters(string, []string) error { } // ApplyTopic is a fake implements of ApplyTopic -func (d *DummyPulsarAdmin) ApplyTopic(string, *TopicParams) error { - return nil +func (d *DummyPulsarAdmin) ApplyTopic(string, *TopicParams) (error, error) { + return nil, nil } // DeleteTopic is a fake implements of DeleteTopic diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index 6dcca462..b6e9897e 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -124,32 +124,32 @@ func (p *PulsarAdminClient) SetNamespaceClusters(completeNSName string, clusters } // ApplyTopic creates a topic with policies -func (p *PulsarAdminClient) ApplyTopic(name string, params *TopicParams) error { +func (p *PulsarAdminClient) ApplyTopic(name string, params *TopicParams) (creationErr error, policyErr error) { completeTopicName := makeCompleteTopicName(name, params.Persistent) topicName, err := utils.GetTopicName(completeTopicName) if err != nil { - return err + return err, nil } partitionNum := int(*params.Partitions) err = p.adminClient.Topics().Create(*topicName, partitionNum) if err != nil { if !IsAlreadyExist(err) { - return err + return err, nil } if partitionNum > 0 { // for partitioned topic, allow to change the partition number if err = p.adminClient.Topics().Update(*topicName, partitionNum); err != nil { - return err + return nil, err } } } err = p.applyTopicPolicies(topicName, params) if err != nil { - return err + return nil, err } - return nil + return nil, nil } // DeleteTenant deletes a specific tenant diff --git a/pkg/admin/interface.go b/pkg/admin/interface.go index 4b0bc9b3..b400d3d5 100644 --- a/pkg/admin/interface.go +++ b/pkg/admin/interface.go @@ -110,7 +110,7 @@ type PulsarAdmin interface { SetNamespaceClusters(name string, clusters []string) error // ApplyTopic creates a topic with parameters - ApplyTopic(name string, params *TopicParams) error + ApplyTopic(name string, params *TopicParams) (error, error) // DeleteTopic delete a specific topic DeleteTopic(name string) error diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index 8c001e47..1b7ee03d 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -23,6 +23,7 @@ import ( "github.com/streamnative/pulsar-resources-operator/pkg/feature" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" @@ -61,7 +62,7 @@ func (r *PulsarTopicReconciler) Observe(ctx context.Context) error { r.conn.topics = topicsList.Items for i := range r.conn.topics { - if !resourcev1alpha1.IsPulsarResourceReady(&r.conn.topics[i]) { + if !isPulsarTopicResourceReady(&r.conn.topics[i]) { r.conn.addUnreadyResource(&r.conn.topics[i]) } } @@ -75,7 +76,9 @@ func (r *PulsarTopicReconciler) Reconcile(ctx context.Context) error { for i := range r.conn.topics { topic := &r.conn.topics[i] if err := r.ReconcileTopic(ctx, r.conn.pulsarAdmin, topic); err != nil { - return fmt.Errorf("reconcile topic [%w]", err) + // return error will stop the other reconcile process + r.log.Error(err, "Failed to reconcile topic", "topicName", topic.Spec.Name) + continue } } return nil @@ -134,12 +137,39 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin } } - if resourcev1alpha1.IsPulsarResourceReady(topic) && + if isPulsarTopicResourceReady(topic) && !feature.DefaultFeatureGate.Enabled(feature.AlwaysUpdatePulsarResource) { log.Info("Skip reconcile, topic resource is ready") return nil } + var policyErrs []error + var creationErr error + defer func() { + if creationErr != nil { + meta.SetStatusCondition(&topic.Status.Conditions, + NewTopicErrorCondition(topic.Generation, resourcev1alpha1.ConditionReady, creationErr.Error())) + } else { + meta.SetStatusCondition(&topic.Status.Conditions, + NewTopicReadyCondition(topic.Generation, resourcev1alpha1.ConditionReady)) + } + if len(policyErrs) != 0 || creationErr != nil { + msg := "" + for _, err := range policyErrs { + msg += err.Error() + ";\n" + } + meta.SetStatusCondition(&topic.Status.Conditions, + NewTopicErrorCondition(topic.Generation, resourcev1alpha1.ConditionTopicPolicyReady, msg)) + } else { + meta.SetStatusCondition(&topic.Status.Conditions, + NewTopicReadyCondition(topic.Generation, resourcev1alpha1.ConditionTopicPolicyReady)) + } + + if err := r.conn.client.Status().Update(ctx, topic); err != nil { + log.Error(err, "Failed to update status") + } + }() + params := createTopicParams(topic) r.applyDefault(params) @@ -147,8 +177,8 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin if refs := topic.Spec.GeoReplicationRefs; len(refs) != 0 { for _, ref := range refs { if err := r.applyGeo(ctx, params, ref, topic); err != nil { - log.Error(err, "Failed to get destination connection for geo replication") - return err + log.Error(err, "Failed to get destination connection for geo replication "+ref.Name) + policyErrs = append(policyErrs, err) } } @@ -161,16 +191,23 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin topic.Status.GeoReplicationEnabled = false } - if err := pulsarAdmin.ApplyTopic(topic.Spec.Name, params); err != nil { - meta.SetStatusCondition(&topic.Status.Conditions, *NewErrorCondition(topic.Generation, err.Error())) - log.Error(err, "Failed to apply topic") - if err := r.conn.client.Status().Update(ctx, topic); err != nil { - log.Error(err, "Failed to update the topic status") - return nil - } - return err + creationErr, policyErr := pulsarAdmin.ApplyTopic(topic.Spec.Name, params) + if policyErr != nil { + policyErrs = append(policyErrs, policyErr) + } + if creationErr != nil { + return creationErr } + if err := applySchema(pulsarAdmin, topic, log); err != nil { + policyErrs = append(policyErrs, err) + } + + topic.Status.ObservedGeneration = topic.Generation + return nil +} + +func applySchema(pulsarAdmin admin.PulsarAdmin, topic *resourcev1alpha1.PulsarTopic, log logr.Logger) error { schema, serr := pulsarAdmin.GetSchema(topic.Spec.Name) if serr != nil && !admin.IsNotFound(serr) { return serr @@ -186,29 +223,16 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin } log.Info("Upload schema for the topic", "name", topic.Spec.Name, "type", info.Type, "schema", info.Schema, "properties", info.Properties) if err := pulsarAdmin.UploadSchema(topic.Spec.Name, param); err != nil { - log.Error(err, "Failed to upload schema") - if err := r.conn.client.Status().Update(ctx, topic); err != nil { - log.Error(err, "Failed to upload schema for the topic") - return nil - } return err } } } else if schema != nil { // Delete the schema when the schema exists and schema info is empty log.Info("Deleting topic schema", "name", topic.Spec.Name) - err := pulsarAdmin.DeleteSchema(topic.Spec.Name) - if err != nil { + if err := pulsarAdmin.DeleteSchema(topic.Spec.Name); err != nil { return err } } - - topic.Status.ObservedGeneration = topic.Generation - meta.SetStatusCondition(&topic.Status.Conditions, *NewReadyCondition(topic.Generation)) - if err := r.conn.client.Status().Update(ctx, topic); err != nil { - log.Error(err, "Failed to update the topic status") - return err - } return nil } @@ -263,3 +287,30 @@ func (r *PulsarTopicReconciler) applyGeo(ctx context.Context, params *admin.Topi params.ReplicationClusters = append(params.ReplicationClusters, r.conn.connection.Spec.ClusterName) return nil } + +func isPulsarTopicResourceReady(topic *resourcev1alpha1.PulsarTopic) bool { + condition := meta.FindStatusCondition(topic.Status.Conditions, resourcev1alpha1.ConditionTopicPolicyReady) + return resourcev1alpha1.IsPulsarResourceReady(topic) && condition != nil && condition.Status == metav1.ConditionTrue +} + +// NewTopicReadyCondition make condition with ready info +func NewTopicReadyCondition(generation int64, conditionType string) metav1.Condition { + return metav1.Condition{ + Type: conditionType, + Status: metav1.ConditionTrue, + ObservedGeneration: generation, + Reason: "Reconciled", + Message: "", + } +} + +// NewTopicErrorCondition make condition with ready info +func NewTopicErrorCondition(generation int64, conditionType, msg string) metav1.Condition { + return metav1.Condition{ + Type: conditionType, + Status: metav1.ConditionFalse, + ObservedGeneration: generation, + Reason: "ReconcileError", + Message: msg, + } +}