From f41390f5a9e1f10b2e0bc89a8efa40494e0e3d66 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 10 Dec 2024 11:59:07 +0800 Subject: [PATCH 1/5] update --- cdc/redo/writer/memory/encoding_worker.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/cdc/redo/writer/memory/encoding_worker.go b/cdc/redo/writer/memory/encoding_worker.go index f62f236f888..8eff1055433 100644 --- a/cdc/redo/writer/memory/encoding_worker.go +++ b/cdc/redo/writer/memory/encoding_worker.go @@ -186,9 +186,10 @@ func (e *encodingWorkerGroup) input( return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - case e.inputChs[idx] <- event: - return nil + default: } + e.inputChs[idx] <- event + return nil } func (e *encodingWorkerGroup) output( @@ -199,9 +200,10 @@ func (e *encodingWorkerGroup) output( return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - case e.outputCh <- event: - return nil + default: } + e.outputCh <- event + return nil } func (e *encodingWorkerGroup) FlushAll(ctx context.Context) error { @@ -224,8 +226,9 @@ func (e *encodingWorkerGroup) FlushAll(ctx context.Context) error { return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - case <-flushCh: + default: } + <-flushCh return nil } @@ -248,8 +251,9 @@ func (e *encodingWorkerGroup) broadcastAndWaitEncoding(ctx context.Context) erro return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - case <-ch: + default: } + <-ch } return nil } From 2a5e887990f79b6b65fa3ee807af9b31ecdfd65c Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 11 Dec 2024 11:59:33 +0800 Subject: [PATCH 2/5] Revert "update" This reverts commit 91d2e3ceec3d2b34f3d752cd26e11025b07eacf9. --- cdc/redo/writer/memory/encoding_worker.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/cdc/redo/writer/memory/encoding_worker.go b/cdc/redo/writer/memory/encoding_worker.go index 8eff1055433..f62f236f888 100644 --- a/cdc/redo/writer/memory/encoding_worker.go +++ b/cdc/redo/writer/memory/encoding_worker.go @@ -186,10 +186,9 @@ func (e *encodingWorkerGroup) input( return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - default: + case e.inputChs[idx] <- event: + return nil } - e.inputChs[idx] <- event - return nil } func (e *encodingWorkerGroup) output( @@ -200,10 +199,9 @@ func (e *encodingWorkerGroup) output( return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - default: + case e.outputCh <- event: + return nil } - e.outputCh <- event - return nil } func (e *encodingWorkerGroup) FlushAll(ctx context.Context) error { @@ -226,9 +224,8 @@ func (e *encodingWorkerGroup) FlushAll(ctx context.Context) error { return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - default: + case <-flushCh: } - <-flushCh return nil } @@ -251,9 +248,8 @@ func (e *encodingWorkerGroup) broadcastAndWaitEncoding(ctx context.Context) erro return ctx.Err() case err := <-e.closed: return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") - default: + case <-ch: } - <-ch } return nil } From ee170890e6735a377aec217af6bda8a964101491 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 11 Dec 2024 12:06:54 +0800 Subject: [PATCH 3/5] update ut --- cdc/redo/writer/memory/mem_log_writer_test.go | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index 12f34ce11e8..115d126b341 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -106,12 +106,30 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { functions := map[string]func(error){ "WriteEvents": func(expected error) { - err := lw.WriteEvents(ctx, events...) - require.ErrorIs(t, errors.Cause(err), expected) + if expected == nil { + err := lw.WriteEvents(ctx, events...) + require.ErrorIs(t, errors.Cause(err), expected) + } else { + require.Eventually( + t, func() bool { + err := lw.WriteEvents(ctx, events...) + return errors.Is(errors.Cause(err), expected) + }, time.Second*2, time.Microsecond*10, + ) + } }, "FlushLog": func(expected error) { - err := lw.FlushLog(ctx) - require.ErrorIs(t, errors.Cause(err), expected) + if expected == nil { + err := lw.FlushLog(ctx) + require.ErrorIs(t, errors.Cause(err), expected) + } else { + require.Eventually( + t, func() bool { + err := lw.WriteEvents(ctx, events...) + return errors.Is(errors.Cause(err), expected) + }, time.Second*2, time.Microsecond*10, + ) + } }, } firstCall := true From 2385b09dd58b33b6cd25880691763ba02fc41a2e Mon Sep 17 00:00:00 2001 From: wk989898 Date: Mon, 16 Dec 2024 14:30:38 +0800 Subject: [PATCH 4/5] update --- cdc/redo/writer/memory/mem_log_writer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index 115d126b341..a3d7f361983 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -108,7 +108,7 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { "WriteEvents": func(expected error) { if expected == nil { err := lw.WriteEvents(ctx, events...) - require.ErrorIs(t, errors.Cause(err), expected) + require.Nil(t, err) } else { require.Eventually( t, func() bool { @@ -121,7 +121,7 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { "FlushLog": func(expected error) { if expected == nil { err := lw.FlushLog(ctx) - require.ErrorIs(t, errors.Cause(err), expected) + require.Nil(t, err) } else { require.Eventually( t, func() bool { From 507bd3dbe83588158942dc27bcfbd6111dd2b740 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 17 Dec 2024 15:30:21 +0800 Subject: [PATCH 5/5] update --- cdc/redo/writer/memory/mem_log_writer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index a3d7f361983..5914328b4a1 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -108,7 +108,7 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { "WriteEvents": func(expected error) { if expected == nil { err := lw.WriteEvents(ctx, events...) - require.Nil(t, err) + require.NoError(t, err) } else { require.Eventually( t, func() bool { @@ -121,7 +121,7 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { "FlushLog": func(expected error) { if expected == nil { err := lw.FlushLog(ctx) - require.Nil(t, err) + require.NoError(t, err) } else { require.Eventually( t, func() bool {