Skip to content

Commit

Permalink
Imporve topic reconcile logic (#225)
Browse files Browse the repository at this point in the history
* improve

* fix ci

* fix ci

* add display column

* improve
  • Loading branch information
labuladong authored Aug 1, 2024
1 parent 93a49cb commit 5a4c8bb
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 36 deletions.
2 changes: 2 additions & 0 deletions api/v1alpha1/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package v1alpha1
const (
// ConditionReady indicates status condition ready
ConditionReady string = "Ready"
// ConditionTopicPolicyReady indicates the topic policy ready
ConditionTopicPolicyReady string = "PolicyReady"
// FinalizerName is the finalizer string that add to object
FinalizerName string = "cloud.streamnative.io/finalizer"

Expand Down
1 change: 1 addition & 0 deletions api/v1alpha1/pulsartopic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ type PulsarTopicStatus struct {
//+kubebuilder:printcolumn:name="GENERATION",type=string,JSONPath=`.metadata.generation`
//+kubebuilder:printcolumn:name="OBSERVED_GENERATION",type=string,JSONPath=`.status.observedGeneration`
//+kubebuilder:printcolumn:name="READY",type=string,JSONPath=`.status.conditions[?(@.type=="Ready")].status`
//+kubebuilder:printcolumn:name="POLICY_READY",type=string,JSONPath=`.status.conditions[?(@.type=="PolicyReady")].status`

// PulsarTopic is the Schema for the pulsartopics API
type PulsarTopic struct {
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/resource.streamnative.io_pulsartopics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ spec:
- jsonPath: .status.conditions[?(@.type=="Ready")].status
name: READY
type: string
- jsonPath: .status.conditions[?(@.type=="PolicyReady")].status
name: POLICY_READY
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down
4 changes: 2 additions & 2 deletions pkg/admin/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (d *DummyPulsarAdmin) SetNamespaceClusters(string, []string) error {
}

// ApplyTopic is a fake implements of ApplyTopic
func (d *DummyPulsarAdmin) ApplyTopic(string, *TopicParams) error {
return nil
func (d *DummyPulsarAdmin) ApplyTopic(string, *TopicParams) (error, error) {
return nil, nil
}

// DeleteTopic is a fake implements of DeleteTopic
Expand Down
12 changes: 6 additions & 6 deletions pkg/admin/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,32 +124,32 @@ func (p *PulsarAdminClient) SetNamespaceClusters(completeNSName string, clusters
}

// ApplyTopic creates a topic with policies
func (p *PulsarAdminClient) ApplyTopic(name string, params *TopicParams) error {
func (p *PulsarAdminClient) ApplyTopic(name string, params *TopicParams) (creationErr error, policyErr error) {
completeTopicName := makeCompleteTopicName(name, params.Persistent)
topicName, err := utils.GetTopicName(completeTopicName)
if err != nil {
return err
return err, nil
}
partitionNum := int(*params.Partitions)
err = p.adminClient.Topics().Create(*topicName, partitionNum)
if err != nil {
if !IsAlreadyExist(err) {
return err
return err, nil
}
if partitionNum > 0 {
// for partitioned topic, allow to change the partition number
if err = p.adminClient.Topics().Update(*topicName, partitionNum); err != nil {
return err
return nil, err
}
}
}

err = p.applyTopicPolicies(topicName, params)
if err != nil {
return err
return nil, err
}

return nil
return nil, nil
}

// DeleteTenant deletes a specific tenant
Expand Down
2 changes: 1 addition & 1 deletion pkg/admin/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type PulsarAdmin interface {
SetNamespaceClusters(name string, clusters []string) error

// ApplyTopic creates a topic with parameters
ApplyTopic(name string, params *TopicParams) error
ApplyTopic(name string, params *TopicParams) (error, error)

// DeleteTopic delete a specific topic
DeleteTopic(name string) error
Expand Down
105 changes: 78 additions & 27 deletions pkg/connection/reconcile_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/streamnative/pulsar-resources-operator/pkg/feature"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -61,7 +62,7 @@ func (r *PulsarTopicReconciler) Observe(ctx context.Context) error {

r.conn.topics = topicsList.Items
for i := range r.conn.topics {
if !resourcev1alpha1.IsPulsarResourceReady(&r.conn.topics[i]) {
if !isPulsarTopicResourceReady(&r.conn.topics[i]) {
r.conn.addUnreadyResource(&r.conn.topics[i])
}
}
Expand All @@ -75,7 +76,9 @@ func (r *PulsarTopicReconciler) Reconcile(ctx context.Context) error {
for i := range r.conn.topics {
topic := &r.conn.topics[i]
if err := r.ReconcileTopic(ctx, r.conn.pulsarAdmin, topic); err != nil {
return fmt.Errorf("reconcile topic [%w]", err)
// return error will stop the other reconcile process
r.log.Error(err, "Failed to reconcile topic", "topicName", topic.Spec.Name)
continue
}
}
return nil
Expand Down Expand Up @@ -134,21 +137,48 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin
}
}

if resourcev1alpha1.IsPulsarResourceReady(topic) &&
if isPulsarTopicResourceReady(topic) &&
!feature.DefaultFeatureGate.Enabled(feature.AlwaysUpdatePulsarResource) {
log.Info("Skip reconcile, topic resource is ready")
return nil
}

var policyErrs []error
var creationErr error
defer func() {
if creationErr != nil {
meta.SetStatusCondition(&topic.Status.Conditions,
NewTopicErrorCondition(topic.Generation, resourcev1alpha1.ConditionReady, creationErr.Error()))
} else {
meta.SetStatusCondition(&topic.Status.Conditions,
NewTopicReadyCondition(topic.Generation, resourcev1alpha1.ConditionReady))
}
if len(policyErrs) != 0 || creationErr != nil {
msg := ""
for _, err := range policyErrs {
msg += err.Error() + ";\n"
}
meta.SetStatusCondition(&topic.Status.Conditions,
NewTopicErrorCondition(topic.Generation, resourcev1alpha1.ConditionTopicPolicyReady, msg))
} else {
meta.SetStatusCondition(&topic.Status.Conditions,
NewTopicReadyCondition(topic.Generation, resourcev1alpha1.ConditionTopicPolicyReady))
}

if err := r.conn.client.Status().Update(ctx, topic); err != nil {
log.Error(err, "Failed to update status")
}
}()

params := createTopicParams(topic)

r.applyDefault(params)

if refs := topic.Spec.GeoReplicationRefs; len(refs) != 0 {
for _, ref := range refs {
if err := r.applyGeo(ctx, params, ref, topic); err != nil {
log.Error(err, "Failed to get destination connection for geo replication")
return err
log.Error(err, "Failed to get destination connection for geo replication "+ref.Name)
policyErrs = append(policyErrs, err)
}
}

Expand All @@ -161,16 +191,23 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin
topic.Status.GeoReplicationEnabled = false
}

if err := pulsarAdmin.ApplyTopic(topic.Spec.Name, params); err != nil {
meta.SetStatusCondition(&topic.Status.Conditions, *NewErrorCondition(topic.Generation, err.Error()))
log.Error(err, "Failed to apply topic")
if err := r.conn.client.Status().Update(ctx, topic); err != nil {
log.Error(err, "Failed to update the topic status")
return nil
}
return err
creationErr, policyErr := pulsarAdmin.ApplyTopic(topic.Spec.Name, params)
if policyErr != nil {
policyErrs = append(policyErrs, policyErr)
}
if creationErr != nil {
return creationErr
}

if err := applySchema(pulsarAdmin, topic, log); err != nil {
policyErrs = append(policyErrs, err)
}

topic.Status.ObservedGeneration = topic.Generation
return nil
}

func applySchema(pulsarAdmin admin.PulsarAdmin, topic *resourcev1alpha1.PulsarTopic, log logr.Logger) error {
schema, serr := pulsarAdmin.GetSchema(topic.Spec.Name)
if serr != nil && !admin.IsNotFound(serr) {
return serr
Expand All @@ -186,29 +223,16 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin
}
log.Info("Upload schema for the topic", "name", topic.Spec.Name, "type", info.Type, "schema", info.Schema, "properties", info.Properties)
if err := pulsarAdmin.UploadSchema(topic.Spec.Name, param); err != nil {
log.Error(err, "Failed to upload schema")
if err := r.conn.client.Status().Update(ctx, topic); err != nil {
log.Error(err, "Failed to upload schema for the topic")
return nil
}
return err
}
}
} else if schema != nil {
// Delete the schema when the schema exists and schema info is empty
log.Info("Deleting topic schema", "name", topic.Spec.Name)
err := pulsarAdmin.DeleteSchema(topic.Spec.Name)
if err != nil {
if err := pulsarAdmin.DeleteSchema(topic.Spec.Name); err != nil {
return err
}
}

topic.Status.ObservedGeneration = topic.Generation
meta.SetStatusCondition(&topic.Status.Conditions, *NewReadyCondition(topic.Generation))
if err := r.conn.client.Status().Update(ctx, topic); err != nil {
log.Error(err, "Failed to update the topic status")
return err
}
return nil
}

Expand Down Expand Up @@ -263,3 +287,30 @@ func (r *PulsarTopicReconciler) applyGeo(ctx context.Context, params *admin.Topi
params.ReplicationClusters = append(params.ReplicationClusters, r.conn.connection.Spec.ClusterName)
return nil
}

func isPulsarTopicResourceReady(topic *resourcev1alpha1.PulsarTopic) bool {
condition := meta.FindStatusCondition(topic.Status.Conditions, resourcev1alpha1.ConditionTopicPolicyReady)
return resourcev1alpha1.IsPulsarResourceReady(topic) && condition != nil && condition.Status == metav1.ConditionTrue
}

// NewTopicReadyCondition make condition with ready info
func NewTopicReadyCondition(generation int64, conditionType string) metav1.Condition {
return metav1.Condition{
Type: conditionType,
Status: metav1.ConditionTrue,
ObservedGeneration: generation,
Reason: "Reconciled",
Message: "",
}
}

// NewTopicErrorCondition make condition with ready info
func NewTopicErrorCondition(generation int64, conditionType, msg string) metav1.Condition {
return metav1.Condition{
Type: conditionType,
Status: metav1.ConditionFalse,
ObservedGeneration: generation,
Reason: "ReconcileError",
Message: msg,
}
}

0 comments on commit 5a4c8bb

Please sign in to comment.