From 0ecf4db64ff9ef8c3cb178f3e75b4e62329654e4 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 15 Apr 2022 15:19:42 +0800 Subject: [PATCH] fix: add retry operation to avoid the topic not found (#50) Fix #48 ### Motivation When we created many topics at one time, then update these topic policies, the Pulsar always returns the topic not found, so add a retry operation. --- go.mod | 1 + go.sum | 3 +++ pulsar/resource_pulsar_topic.go | 42 +++++++++++++++++---------------- 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/go.mod b/go.mod index 8950956..7d501a6 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( github.com/aws/aws-sdk-go v1.25.48 // indirect + github.com/cenkalti/backoff/v4 v4.1.2 github.com/hashicorp/go-multierror v1.0.0 github.com/hashicorp/hcl/v2 v2.1.0 // indirect github.com/hashicorp/terraform-plugin-sdk v1.4.0 diff --git a/go.sum b/go.sum index dbc2827..c2cd73d 100644 --- a/go.sum +++ b/go.sum @@ -65,7 +65,10 @@ github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d h1:xDfNPAt8lFiC1U github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d/go.mod h1:6QX/PXZ00z/TKoufEY6K/a0k6AhaJrQKdFe6OfVXsa4= github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= +github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cheggaaa/pb v1.0.27/go.mod h1:pQciLPpbU0oxA0h+VJYYLxO+XeDQb5pZijXscXHm81s= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= diff --git a/pulsar/resource_pulsar_topic.go b/pulsar/resource_pulsar_topic.go index c6d382c..6fa88de 100644 --- a/pulsar/resource_pulsar_topic.go +++ b/pulsar/resource_pulsar_topic.go @@ -20,6 +20,7 @@ package pulsar import ( "fmt" + "github.com/cenkalti/backoff/v4" "github.com/hashicorp/terraform-plugin-sdk/helper/schema" "github.com/pkg/errors" "github.com/streamnative/pulsarctl/pkg/pulsar" @@ -149,17 +150,11 @@ func resourcePulsarTopicCreate(d *schema.ResourceData, meta interface{}) error { return fmt.Errorf("ERROR_CREATE_TOPIC_PERMISSION_GRANT: %w", err) } - if topicName.IsPersistent() { - err = updateRetentionPolicies(d, meta, topicName) - if err != nil { - return fmt.Errorf("ERROR_CREATE_TOPIC_RETENTION_POLICIES: %w", err) - } - } else { - retentionPoliciesConfig := d.Get("retention_policies").(*schema.Set) - if retentionPoliciesConfig.Len() != 0 { - return fmt.Errorf("ERROR_CREATE_TOPIC_RETENTION_POLICIES: " + - "unsupported set retention policies for non-persistent topic") - } + err = retry(func() error { + return updateRetentionPolicies(d, meta, topicName) + }) + if err != nil { + return fmt.Errorf("ERROR_CREATE_TOPIC_RETENTION_POLICIES: %w", err) } return resourcePulsarTopicRead(d, meta) @@ -197,7 +192,6 @@ func resourcePulsarTopicRead(d *schema.ResourceData, meta interface{}) error { if retPoliciesCfg, ok := d.GetOk("retention_policies"); ok && retPoliciesCfg.(*schema.Set).Len() > 0 { if topicName.IsPersistent() { - ret, err := client.GetRetention(*topicName, true) if err != nil { return fmt.Errorf("ERROR_READ_TOPIC: GetRetention: %w", err) @@ -209,6 +203,8 @@ func resourcePulsarTopicRead(d *schema.ResourceData, meta interface{}) error { "retention_size_mb": int(ret.RetentionSizeInMB), }, }) + } else { + return errors.New("ERROR_READ_TOPIC: unsupported get retention policies for non-persistent topic") } } @@ -374,20 +370,22 @@ func updateRetentionPolicies(d *schema.ResourceData, meta interface{}, topicName client := meta.(pulsar.Client).Topics() retentionPoliciesConfig := d.Get("retention_policies").(*schema.Set) + if retentionPoliciesConfig.Len() == 0 { + return nil + } + if !topicName.IsPersistent() { return errors.New("ERROR_UPDATE_RETENTION_POLICIES: SetRetention: " + "unsupported set retention policies for non-persistent topic") } - if retentionPoliciesConfig.Len() > 0 { - var policies utils.RetentionPolicies - data := retentionPoliciesConfig.List()[0].(map[string]interface{}) - policies.RetentionTimeInMinutes = data["retention_time_minutes"].(int) - policies.RetentionSizeInMB = int64(data["retention_size_mb"].(int)) + var policies utils.RetentionPolicies + data := retentionPoliciesConfig.List()[0].(map[string]interface{}) + policies.RetentionTimeInMinutes = data["retention_time_minutes"].(int) + policies.RetentionSizeInMB = int64(data["retention_size_mb"].(int)) - if err := client.SetRetention(*topicName, policies); err != nil { - return fmt.Errorf("ERROR_UPDATE_RETENTION_POLICIES: SetRetention: %w", err) - } + if err := client.SetRetention(*topicName, policies); err != nil { + return fmt.Errorf("ERROR_UPDATE_RETENTION_POLICIES: SetRetention: %w", err) } return nil @@ -412,3 +410,7 @@ func updatePartitions(d *schema.ResourceData, meta interface{}, topicName *utils return nil } + +func retry(operation func() error) error { + return backoff.Retry(operation, backoff.NewExponentialBackOff()) +}