Skip to content

Commit

Permalink
fix(varlogtest): log stream appender can be closed in the callback
Browse files Browse the repository at this point in the history
This PR fixed the varlogtest to be able to close the log stream appender in the callback.
  • Loading branch information
ijsong committed Sep 27, 2023
1 parent 9d0b203 commit e08371b
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 6 deletions.
44 changes: 39 additions & 5 deletions pkg/varlogtest/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,26 @@ import (
type testLog struct {
vt *VarlogTest
closed bool

lsaPool struct {
lsaMap map[int64]*logStreamAppender
nextID int64
mu sync.Mutex

wg sync.WaitGroup
}
}

var _ varlog.Log = (*testLog)(nil)

func newTestLog(vt *VarlogTest) *testLog {
c := &testLog{
vt: vt,
}
c.lsaPool.lsaMap = make(map[int64]*logStreamAppender)
return c
}

func (c *testLog) lock() error {
c.vt.cond.L.Lock()
if c.closed {
Expand All @@ -38,6 +54,14 @@ func (c *testLog) Close() error {
defer c.vt.cond.L.Unlock()
c.closed = true
c.vt.cond.Broadcast()

c.lsaPool.mu.Lock()
defer c.lsaPool.mu.Unlock()
for _, lsa := range c.lsaPool.lsaMap {
lsa.Close()
}

c.lsaPool.wg.Wait()
return nil
}

Expand Down Expand Up @@ -342,9 +366,22 @@ func (c *testLog) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamI
lsa.queue.ch = make(chan *queueEntry, pipelineSize)
lsa.queue.cv = sync.NewCond(&lsa.queue.mu)

lsa.wg.Add(1)
c.lsaPool.mu.Lock()
defer c.lsaPool.mu.Unlock()
id := c.lsaPool.nextID
c.lsaPool.nextID++
c.lsaPool.lsaMap[id] = lsa

c.lsaPool.wg.Add(1)
go func() {
defer lsa.wg.Done()
defer func() {
c.lsaPool.wg.Done()

c.lsaPool.mu.Lock()
defer c.lsaPool.mu.Unlock()
delete(c.lsaPool.lsaMap, id)
}()

for qe := range lsa.queue.ch {
qe.callback(qe.result.Metadata, qe.result.Err)
lsa.queue.cv.L.Lock()
Expand Down Expand Up @@ -377,8 +414,6 @@ type logStreamAppender struct {
cv *sync.Cond
mu sync.Mutex
}

wg sync.WaitGroup
}

var _ varlog.LogStreamAppender = (*logStreamAppender)(nil)
Expand Down Expand Up @@ -417,7 +452,6 @@ func (lsa *logStreamAppender) Close() {
}
lsa.closed.value = true
close(lsa.queue.ch)
lsa.wg.Wait()
}

type errSubscriber struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/varlogtest/varlogtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (vt *VarlogTest) NewAdminClient() varlog.Admin {
}

func (vt *VarlogTest) NewLogClient() varlog.Log {
return &testLog{vt: vt}
return newTestLog(vt)
}

func (vt *VarlogTest) generateTopicID() types.TopicID {
Expand Down
33 changes: 33 additions & 0 deletions pkg/varlogtest/varlogtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,39 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {
require.NoError(t, err)
},
},
{
name: "CloseInCallback",
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
lsa, err := vcli.NewLogStreamAppender(tpid, lsid)
require.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)
err = lsa.AppendBatch([][]byte{[]byte("foo")}, func(lem []varlogpb.LogEntryMeta, err error) {
defer wg.Done()
assert.NoError(t, err)
lsa.Close()
})
require.NoError(t, err)
wg.Wait()
},
},
{
name: "DoesNotCloseLogStreamAppender",
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
// Closing the log client will shut down the log stream appender forcefully.
lsa, err := vcli.NewLogStreamAppender(tpid, lsid)
require.NoError(t, err)

cb := func(_ []varlogpb.LogEntryMeta, err error) {
assert.NoError(t, err)
}
for i := 0; i < numLogs; i++ {
err := lsa.AppendBatch([][]byte{[]byte("foo")}, cb)
require.NoError(t, err)
}
},
},
{
name: "Manager",
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
Expand Down

0 comments on commit e08371b

Please sign in to comment.