Skip to content

Commit

Permalink
fix(varlogtest): log stream appender can be closed in the callback
Browse files Browse the repository at this point in the history
This PR fixed the varlogtest to be able to close the log stream appender in the callback.
  • Loading branch information
ijsong committed Sep 27, 2023
1 parent 9d0b203 commit 3918475
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
11 changes: 6 additions & 5 deletions pkg/varlogtest/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
type testLog struct {
vt *VarlogTest
closed bool
// wg is waitgroup to manage client-wise goroutines, for example, callbacks
// of log stream appenders.
wg sync.WaitGroup
}

var _ varlog.Log = (*testLog)(nil)
Expand All @@ -38,6 +41,7 @@ func (c *testLog) Close() error {
defer c.vt.cond.L.Unlock()
c.closed = true
c.vt.cond.Broadcast()
c.wg.Wait()
return nil
}

Expand Down Expand Up @@ -342,9 +346,9 @@ func (c *testLog) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamI
lsa.queue.ch = make(chan *queueEntry, pipelineSize)
lsa.queue.cv = sync.NewCond(&lsa.queue.mu)

lsa.wg.Add(1)
lsa.c.wg.Add(1)
go func() {
defer lsa.wg.Done()
defer lsa.c.wg.Done()
for qe := range lsa.queue.ch {
qe.callback(qe.result.Metadata, qe.result.Err)
lsa.queue.cv.L.Lock()
Expand Down Expand Up @@ -377,8 +381,6 @@ type logStreamAppender struct {
cv *sync.Cond
mu sync.Mutex
}

wg sync.WaitGroup
}

var _ varlog.LogStreamAppender = (*logStreamAppender)(nil)
Expand Down Expand Up @@ -417,7 +419,6 @@ func (lsa *logStreamAppender) Close() {
}
lsa.closed.value = true
close(lsa.queue.ch)
lsa.wg.Wait()
}

type errSubscriber struct {
Expand Down
17 changes: 17 additions & 0 deletions pkg/varlogtest/varlogtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,23 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {
require.NoError(t, err)
},
},
{
name: "CloseInCallback",
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
lsa, err := vcli.NewLogStreamAppender(tpid, lsid)
require.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)
err = lsa.AppendBatch([][]byte{[]byte("foo")}, func(lem []varlogpb.LogEntryMeta, err error) {
defer wg.Done()
assert.NoError(t, err)
lsa.Close()
})
require.NoError(t, err)
wg.Wait()
},
},
{
name: "Manager",
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
Expand Down

0 comments on commit 3918475

Please sign in to comment.