Skip to content

Commit

Permalink
ticdc(redo, sink): return correct error in redo writer & fix default …
Browse files Browse the repository at this point in the history
…retryer (#11747)

close #11744
  • Loading branch information
CharlesCheung96 authored Nov 16, 2024
1 parent b38183b commit ea35677
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 18 deletions.
2 changes: 1 addition & 1 deletion cdc/redo/writer/memory/encoding_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) {
zap.String("namespace", e.changefeed.Namespace),
zap.String("changefeed", e.changefeed.ID),
zap.Error(err))
if err != nil && errors.Cause(err) != context.Canceled {
if err != nil {
e.closed <- err
}
close(e.closed)
Expand Down
26 changes: 22 additions & 4 deletions cdc/redo/writer/memory/mem_log_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -99,10 +100,27 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) {
})
require.NoError(t, err)

require.ErrorIs(t, lw.Close(), context.Canceled)
// duplicate close should return the same error
require.ErrorIs(t, lw.Close(), context.Canceled)

err = lw.WriteEvents(ctx, events...)
require.NoError(t, err)
err = lw.FlushLog(ctx)
require.NoError(t, err)
functions := map[string]func(error){
"WriteEvents": func(expected error) {
err := lw.WriteEvents(ctx, events...)
require.ErrorIs(t, errors.Cause(err), expected)
},
"FlushLog": func(expected error) {
err := lw.FlushLog(ctx)
require.ErrorIs(t, errors.Cause(err), expected)
},
}
firstCall := true
for _, f := range functions {
if firstCall {
firstCall = false
f(context.Canceled)
} else {
f(nil)
}
}
}
2 changes: 1 addition & 1 deletion pkg/redo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func IsBlackholeStorage(scheme string) bool {

// InitExternalStorage init an external storage.
var InitExternalStorage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
s, err := util.GetExternalStorageWithTimeout(ctx, uri.String(), DefaultTimeout)
s, err := util.GetExternalStorageWithDefaultTimeout(ctx, uri.String())
if err != nil {
return nil, errors.WrapError(errors.ErrStorageInitialize, err,
fmt.Sprintf("can't init external storage for %s", uri.String()))
Expand Down
6 changes: 1 addition & 5 deletions pkg/sink/kafka/claimcheck/claim_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ import (
"go.uber.org/zap"
)

const (
defaultTimeout = 5 * time.Minute
)

// ClaimCheck manage send message to the claim-check external storage.
type ClaimCheck struct {
storage storage.ExternalStorage
Expand All @@ -59,7 +55,7 @@ func New(ctx context.Context, config *config.LargeMessageHandleConfig, changefee
zap.String("storageURI", util.MaskSensitiveDataInURI(config.ClaimCheckStorageURI)))

start := time.Now()
externalStorage, err := util.GetExternalStorageWithTimeout(ctx, config.ClaimCheckStorageURI, defaultTimeout)
externalStorage, err := util.GetExternalStorageWithDefaultTimeout(ctx, config.ClaimCheckStorageURI)
if err != nil {
log.Error("create external storage failed",
zap.String("namespace", changefeedID.Namespace),
Expand Down
16 changes: 9 additions & 7 deletions pkg/util/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,27 @@ import (
"golang.org/x/sync/errgroup"
)

const defaultTimeout = 5 * time.Minute

// GetExternalStorageFromURI creates a new storage.ExternalStorage from a uri.
func GetExternalStorageFromURI(
ctx context.Context, uri string,
) (storage.ExternalStorage, error) {
return GetExternalStorage(ctx, uri, nil, DefaultS3Retryer())
}

// GetExternalStorageWithTimeout creates a new storage.ExternalStorage from a uri
// GetExternalStorageWithDefaultTimeout creates a new storage.ExternalStorage from a uri
// without retry. It is the caller's responsibility to set timeout to the context.
func GetExternalStorageWithTimeout(
ctx context.Context, uri string, timeout time.Duration,
) (storage.ExternalStorage, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
func GetExternalStorageWithDefaultTimeout(ctx context.Context, uri string) (storage.ExternalStorage, error) {
ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
s, err := GetExternalStorage(ctx, uri, nil, nil)
// total retry time is [1<<7, 1<<8] = [128, 256] + 30*6 = [308, 436] seconds
r := NewS3Retryer(7, 1*time.Second, 2*time.Second)
s, err := GetExternalStorage(ctx, uri, nil, r)

return &extStorageWithTimeout{
ExternalStorage: s,
timeout: timeout,
timeout: defaultTimeout,
}, err
}

Expand Down

0 comments on commit ea35677

Please sign in to comment.