From ea16480b2ef2b5f70711d2cf3e41ef0743d08c9a Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 10 Dec 2024 11:59:07 +0800 Subject: [PATCH 1/5] update --- cdc/redo/writer/memory/encoding_worker.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/cdc/redo/writer/memory/encoding_worker.go b/cdc/redo/writer/memory/encoding_worker.go index f62f236f888..8eff1055433 100644 --- a/cdc/redo/writer/memory/encoding_worker.go +++ b/cdc/redo/writer/memory/encoding_worker.go @@ -186,9 +186,10 @@ func (e *encodingWorkerGroup) input( return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - case e.inputChs[idx] <- event: - return nil + default: } + e.inputChs[idx] <- event + return nil } func (e *encodingWorkerGroup) output( @@ -199,9 +200,10 @@ func (e *encodingWorkerGroup) output( return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - case e.outputCh <- event: - return nil + default: } + e.outputCh <- event + return nil } func (e *encodingWorkerGroup) FlushAll(ctx context.Context) error { @@ -224,8 +226,9 @@ func (e *encodingWorkerGroup) FlushAll(ctx context.Context) error { return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - case <-flushCh: + default: } + <-flushCh return nil } @@ -248,8 +251,9 @@ func (e *encodingWorkerGroup) broadcastAndWaitEncoding(ctx context.Context) erro return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - case <-ch: + default: } + <-ch } return nil } From 56784089948be84438eb7869ef2aefcf5bf191da Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 11 Dec 2024 11:59:33 +0800 Subject: [PATCH 2/5] Revert "update" This reverts commit 91d2e3ceec3d2b34f3d752cd26e11025b07eacf9. --- cdc/redo/writer/memory/encoding_worker.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/cdc/redo/writer/memory/encoding_worker.go b/cdc/redo/writer/memory/encoding_worker.go index 8eff1055433..f62f236f888 100644 --- a/cdc/redo/writer/memory/encoding_worker.go +++ b/cdc/redo/writer/memory/encoding_worker.go @@ -186,10 +186,9 @@ func (e *encodingWorkerGroup) input( return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - default: + case e.inputChs[idx] <- event: + return nil } - e.inputChs[idx] <- event - return nil } func (e *encodingWorkerGroup) output( @@ -200,10 +199,9 @@ func (e *encodingWorkerGroup) output( return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - default: + case e.outputCh <- event: + return nil } - e.outputCh <- event - return nil } func (e *encodingWorkerGroup) FlushAll(ctx context.Context) error { @@ -226,9 +224,8 @@ func (e *encodingWorkerGroup) FlushAll(ctx context.Context) error { return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - default: + case <-flushCh: } - <-flushCh return nil } @@ -251,9 +248,8 @@ func (e *encodingWorkerGroup) broadcastAndWaitEncoding(ctx context.Context) erro return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - default: + case <-ch: } - <-ch } return nil } From ea7c0232947ff954ff38d4841b93ef8c82454d84 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 11 Dec 2024 12:06:54 +0800 Subject: [PATCH 3/5] update ut --- cdc/redo/writer/memory/mem_log_writer_test.go | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index 238830af1e6..5888f29447b 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -94,12 +94,30 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { functions := map[string]func(error){ "WriteEvents": func(expected error) { - err := lw.WriteEvents(ctx, events...) - require.ErrorIs(t, errors.Cause(err), expected) + if expected == nil { + err := lw.WriteEvents(ctx, events...) + require.ErrorIs(t, errors.Cause(err), expected) + } else { + require.Eventually( + t, func() bool { + err := lw.WriteEvents(ctx, events...) + return errors.Is(errors.Cause(err), expected) + }, time.Second*2, time.Microsecond*10, + ) + } }, "FlushLog": func(expected error) { - err := lw.FlushLog(ctx) - require.ErrorIs(t, errors.Cause(err), expected) + if expected == nil { + err := lw.FlushLog(ctx) + require.ErrorIs(t, errors.Cause(err), expected) + } else { + require.Eventually( + t, func() bool { + err := lw.WriteEvents(ctx, events...) + return errors.Is(errors.Cause(err), expected) + }, time.Second*2, time.Microsecond*10, + ) + } }, } firstCall := true From a6cefc683dba9e990c6cea2614ac8df7753eac78 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Mon, 16 Dec 2024 14:30:38 +0800 Subject: [PATCH 4/5] update --- cdc/redo/writer/memory/mem_log_writer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index 5888f29447b..ffa0e7880c7 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -96,7 +96,7 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { "WriteEvents": func(expected error) { if expected == nil { err := lw.WriteEvents(ctx, events...) - require.ErrorIs(t, errors.Cause(err), expected) + require.Nil(t, err) } else { require.Eventually( t, func() bool { @@ -109,7 +109,7 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { "FlushLog": func(expected error) { if expected == nil { err := lw.FlushLog(ctx) - require.ErrorIs(t, errors.Cause(err), expected) + require.Nil(t, err) } else { require.Eventually( t, func() bool { From a23c84b3c9010e990cb49f725539648cf9155087 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 17 Dec 2024 15:30:21 +0800 Subject: [PATCH 5/5] update --- cdc/redo/writer/memory/mem_log_writer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index ffa0e7880c7..e1f2d80c33a 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -96,7 +96,7 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { "WriteEvents": func(expected error) { if expected == nil { err := lw.WriteEvents(ctx, events...) - require.Nil(t, err) + require.NoError(t, err) } else { require.Eventually( t, func() bool { @@ -109,7 +109,7 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { "FlushLog": func(expected error) { if expected == nil { err := lw.FlushLog(ctx) - require.Nil(t, err) + require.NoError(t, err) } else { require.Eventually( t, func() bool {