Skip to content

Commit

Permalink
fix: handle 429 errors gracefully, with re-tries, in replicate-logs (#33
Browse files Browse the repository at this point in the history
)

* fix: handle 429 errors gracefully, with re-tries, in replicate-logs

AB#9913

* fix: take changes supporting better ratelimiting handling

* fix: minor spelling nits

* review: refactor a little to reduce cognative load

* fix: missed an err check

---------

Co-authored-by: Robin Bryce <[email protected]>
  • Loading branch information
robinbryce and Robin Bryce authored Sep 20, 2024
1 parent e5a03de commit e21c5b7
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 51 deletions.
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@ require (
github.com/datatrails/go-datatrails-common v0.16.1
github.com/datatrails/go-datatrails-common-api-gen v0.4.6
github.com/datatrails/go-datatrails-logverification v0.1.7
github.com/datatrails/go-datatrails-merklelog/massifs v0.1.0
github.com/datatrails/go-datatrails-merklelog/massifs v0.1.1
github.com/datatrails/go-datatrails-merklelog/mmr v0.0.2
github.com/datatrails/go-datatrails-merklelog/mmrtesting v0.1.0
github.com/datatrails/go-datatrails-simplehash v0.0.5
github.com/gosuri/uiprogress v0.0.1
github.com/urfave/cli/v2 v2.27.1
github.com/zeebo/bencode v1.0.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
)

// replace (
// github.com/datatrails/go-datatrails-merklelog/massifs => ../go-datatrails-merklelog/massifs
// )
// replace github.com/datatrails/go-datatrails-merklelog/massifs => ../go-datatrails-merklelog/massifs

require (
github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ github.com/datatrails/go-datatrails-common-api-gen v0.4.6 h1:yzviWC2jBOC3ItotQQl
github.com/datatrails/go-datatrails-common-api-gen v0.4.6/go.mod h1:OQN91xvlW6xcWTFvwsM2Nn4PZwFAIOE52FG7yRl4QPQ=
github.com/datatrails/go-datatrails-logverification v0.1.7 h1:HCZj3V2n2J7RzY39Ne2hyOCl2z5kjqm/D7ibOtRiSc4=
github.com/datatrails/go-datatrails-logverification v0.1.7/go.mod h1:yCYT82iv95QGgvXTxQRb9vSkHF653cjiDXXwOAw3I4s=
github.com/datatrails/go-datatrails-merklelog/massifs v0.1.0 h1:NzukXz65iplfjHMf74A+b76rgALgShV2DO/EyZHfn1Y=
github.com/datatrails/go-datatrails-merklelog/massifs v0.1.0/go.mod h1:RT4xRDMMMzEXPaSg87Dl7ODWd5bNxJiPptxRDcTxcVk=
github.com/datatrails/go-datatrails-merklelog/massifs v0.1.1 h1:ZdAIM0ojp1lFeocOtjwVLWx8fa3ytUKAmLCj4KFq9MU=
github.com/datatrails/go-datatrails-merklelog/massifs v0.1.1/go.mod h1:RT4xRDMMMzEXPaSg87Dl7ODWd5bNxJiPptxRDcTxcVk=
github.com/datatrails/go-datatrails-merklelog/mmr v0.0.2 h1:Jxov4/onoFiCISLQNSPy/nyt3USAEvUZpEjlScHJYKI=
github.com/datatrails/go-datatrails-merklelog/mmr v0.0.2/go.mod h1:+Oz8O6bns0rF6gr03xJzKTBzUzyskZ8Gics8/qeNzYk=
github.com/datatrails/go-datatrails-merklelog/mmrtesting v0.1.0 h1:q9RXtAGydXKSJjARnFObNu743cbfIOfERTXiiVa2tF4=
Expand Down Expand Up @@ -159,6 +159,8 @@ golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
Expand Down
164 changes: 119 additions & 45 deletions replicatelogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/datatrails/go-datatrails-common/cbor"
"github.com/datatrails/go-datatrails-common/logger"
"github.com/datatrails/go-datatrails-merklelog/massifs"
"github.com/gosuri/uiprogress"
"github.com/urfave/cli/v2"
"golang.org/x/exp/rand"
)

const (
// baseDefaultRetryDelay is the base delay for retrying transient errors. A little jitter is added.
// 429 errors which provide a valid Retry-After header will honor that header rather than use this.
baseDefaultRetryDelay = 2 * time.Second
defaultConcurrency = 5
)

var (
Expand Down Expand Up @@ -75,6 +84,21 @@ changes are read from standard input.`,
Value: false,
Aliases: []string{"p"},
},
&cli.IntFlag{
Name: "retries",
Aliases: []string{"r"},
Value: -1, // -1 means no limit
Usage: `
Set a maximum number of retries for transient error conditions. Set 0 to disable retries.
By default transient errors are re-tried without limit, and if the error is 429, the Retry-After header is honored.`,
},
&cli.IntFlag{
Name: "concurrency",
Value: defaultConcurrency,
Aliases: []string{"c"},
Usage: fmt.Sprintf(
`The number of concurrent replication operations to run, defaults to %d. A high number is a sure way to get rate limited`, defaultConcurrency),
},
},
Action: func(cCtx *cli.Context) error {
cmd := &CmdCtx{}
Expand All @@ -99,58 +123,108 @@ changes are read from standard input.`,
}
progress := newProgressor(cCtx, "tenants", len(changes))

var wg sync.WaitGroup
errChan := make(chan error, len(changes)) // buffered so it doesn't block

for _, change := range changes {
wg.Add(1)
go func(change TenantMassif, errChan chan<- error) {
defer wg.Done()
defer progress.Completed()

replicator, err := NewVerifiedReplica(cCtx, cmd.Clone())
if err != nil {
errChan <- err
return
}
endMassif := uint32(change.Massif)
startMassif := uint32(0)
if cCtx.IsSet("ancestors") && uint32(cCtx.Int("ancestors")) < endMassif {
startMassif = endMassif - uint32(cCtx.Int("ancestors"))
}

err = replicator.ReplicateVerifiedUpdates(
context.Background(),
change.Tenant, startMassif, endMassif,
)
if err != nil {
errChan <- err
}
}(change, errChan)
concurency := min(len(changes), max(1, cCtx.Int("concurrency")))
for i := 0; i < len(changes); i += concurency {
err = replicateChanges(cCtx, cmd, changes[i:min(i+concurency, len(changes))], progress)
if err != nil {
return err
}
}

// the error channel is buffered enough for each tenant, so this will not get deadlocked
wg.Wait()
close(errChan)

var errs []error
for err := range errChan {
cmd.log.Infof(err.Error())
errs = append(errs, err)
}
if len(errs) > 0 {
return errs[0]
}
if len(changes) == 1 {
cmd.log.Infof("replication complete for tenant %s", changes[0].Tenant)
} else {
cmd.log.Infof("replication complete for %d tenants", len(changes))
}
return nil
},
}
}

// replicateChanges replicate the changes for the provided slice of tenants.
// Paralelism is limited by breaking the total changes into smaller slices and calling this function
func replicateChanges(cCtx *cli.Context, cmd *CmdCtx, changes []TenantMassif, progress Progresser) error {

var wg sync.WaitGroup
errChan := make(chan error, len(changes)) // buffered so it doesn't block

for _, change := range changes {
wg.Add(1)
go func(change TenantMassif, errChan chan<- error) {
defer wg.Done()
defer progress.Completed()

retries := max(-1, cCtx.Int("retries"))
for {

replicator, startMassif, endMassif, err := initReplication(cCtx, cmd, change)
if err != nil {
errChan <- err
return
}

err = replicator.ReplicateVerifiedUpdates(
context.Background(),
change.Tenant, startMassif, endMassif,
)
if err == nil {
return
}

// 429 is the only transient error we currently re-try
var retryDelay time.Duration
retryDelay, ok := massifs.IsRateLimiting(err)
if !ok || retries == 0 {
// not transient
errChan <- err
return
}
if retryDelay == 0 {
retryDelay = defaultRetryDelay(err)
}

// underflow will actually terminate the loop, but that would have been running for an infeasable amount of time
retries--
// in the default case, remaining is always reported as -1
cmd.log.Infof("retrying in %s, remaining: %d", retryDelay, max(-1, retries))
}
}(change, errChan)
}

// the error channel is buffered enough for each tenant, so this will not get deadlocked
wg.Wait()
close(errChan)

var errs []error
for err := range errChan {
cmd.log.Infof(err.Error())
errs = append(errs, err)
}
if len(errs) > 0 {
return errs[0]
}
if len(changes) == 1 {
cmd.log.Infof("replication complete for tenant %s", changes[0].Tenant)
} else {
cmd.log.Infof("replication complete for %d tenants", len(changes))
}
return nil
}

func initReplication(cCtx *cli.Context, cmd *CmdCtx, change TenantMassif) (*VerifiedReplica, uint32, uint32, error) {

replicator, err := NewVerifiedReplica(cCtx, cmd.Clone())
if err != nil {
return nil, 0, 0, err
}
endMassif := uint32(change.Massif)
startMassif := uint32(0)
if cCtx.IsSet("ancestors") && uint32(cCtx.Int("ancestors")) < endMassif {
startMassif = endMassif - uint32(cCtx.Int("ancestors"))
}
return replicator, startMassif, endMassif, nil
}

func defaultRetryDelay(_ error) time.Duration {
// give the delay some jitter, this is universally a good practice
return baseDefaultRetryDelay + time.Duration(rand.Intn(100))*time.Millisecond
}

func newProgressor(cCtx *cli.Context, barName string, increments int) Progresser {

if !cCtx.Bool("progress") {
Expand Down
Loading

0 comments on commit e21c5b7

Please sign in to comment.