From 3918475fd9e6f6c3e361effda5b80f43c04aad04 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Wed, 27 Sep 2023 20:54:37 +0900 Subject: [PATCH] fix(varlogtest): log stream appender can be closed in the callback This PR fixed the varlogtest to be able to close the log stream appender in the callback. --- pkg/varlogtest/log.go | 11 ++++++----- pkg/varlogtest/varlogtest_test.go | 17 +++++++++++++++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/pkg/varlogtest/log.go b/pkg/varlogtest/log.go index e2591f4ca..e30cf42be 100644 --- a/pkg/varlogtest/log.go +++ b/pkg/varlogtest/log.go @@ -16,6 +16,9 @@ import ( type testLog struct { vt *VarlogTest closed bool + // wg is waitgroup to manage client-wise goroutines, for example, callbacks + // of log stream appenders. + wg sync.WaitGroup } var _ varlog.Log = (*testLog)(nil) @@ -38,6 +41,7 @@ func (c *testLog) Close() error { defer c.vt.cond.L.Unlock() c.closed = true c.vt.cond.Broadcast() + c.wg.Wait() return nil } @@ -342,9 +346,9 @@ func (c *testLog) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamI lsa.queue.ch = make(chan *queueEntry, pipelineSize) lsa.queue.cv = sync.NewCond(&lsa.queue.mu) - lsa.wg.Add(1) + lsa.c.wg.Add(1) go func() { - defer lsa.wg.Done() + defer lsa.c.wg.Done() for qe := range lsa.queue.ch { qe.callback(qe.result.Metadata, qe.result.Err) lsa.queue.cv.L.Lock() @@ -377,8 +381,6 @@ type logStreamAppender struct { cv *sync.Cond mu sync.Mutex } - - wg sync.WaitGroup } var _ varlog.LogStreamAppender = (*logStreamAppender)(nil) @@ -417,7 +419,6 @@ func (lsa *logStreamAppender) Close() { } lsa.closed.value = true close(lsa.queue.ch) - lsa.wg.Wait() } type errSubscriber struct { diff --git a/pkg/varlogtest/varlogtest_test.go b/pkg/varlogtest/varlogtest_test.go index e1bbfd27b..ca8545f00 100644 --- a/pkg/varlogtest/varlogtest_test.go +++ b/pkg/varlogtest/varlogtest_test.go @@ -198,6 +198,23 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) { require.NoError(t, err) }, }, + { + name: "CloseInCallback", + testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) { + lsa, err := vcli.NewLogStreamAppender(tpid, lsid) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + err = lsa.AppendBatch([][]byte{[]byte("foo")}, func(lem []varlogpb.LogEntryMeta, err error) { + defer wg.Done() + assert.NoError(t, err) + lsa.Close() + }) + require.NoError(t, err) + wg.Wait() + }, + }, { name: "Manager", testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {