Skip to content

Commit

Permalink
fix(varlogtest): log stream appender returns missing errors in the ca…
Browse files Browse the repository at this point in the history
…llback

This PR fixed the log stream appender's bug in the varlogtest package. It had the bug that the
callback in the log stream appender didn't return an error.
  • Loading branch information
ijsong committed Sep 27, 2023
1 parent 73d6a91 commit 9d0b203
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/varlogtest/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (c *testLog) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamI
go func() {
defer lsa.wg.Done()
for qe := range lsa.queue.ch {
qe.callback(qe.result.Metadata, nil)
qe.callback(qe.result.Metadata, qe.result.Err)
lsa.queue.cv.L.Lock()
lsa.queue.cv.Broadcast()
lsa.queue.cv.L.Unlock()
Expand Down
27 changes: 22 additions & 5 deletions pkg/varlogtest/varlogtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,11 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {

tcs := []struct {
name string
testf func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID)
testf func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID)
}{
{
name: "Closed",
testf: func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
lsa, err := vcli.NewLogStreamAppender(tpid, lsid)
require.NoError(t, err)

Expand All @@ -167,7 +167,7 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {
},
{
name: "AppendLogs",
testf: func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
lsa, err := vcli.NewLogStreamAppender(tpid, lsid)
require.NoError(t, err)
defer lsa.Close()
Expand All @@ -181,9 +181,26 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {
}
},
},
{
name: "CouldNotAppend_Sealed",
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
_, err := vadm.Seal(context.Background(), tpid, lsid)
require.NoError(t, err)

lsa, err := vcli.NewLogStreamAppender(tpid, lsid)
require.NoError(t, err)
defer lsa.Close()

cb := func(_ []varlogpb.LogEntryMeta, err error) {
assert.ErrorIs(t, err, verrors.ErrSealed)
}
err = lsa.AppendBatch([][]byte{[]byte("foo")}, cb)
require.NoError(t, err)
},
},
{
name: "Manager",
testf: func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
mgr := mlsa.New(vcli)

_, err := mgr.Get(tpid+1, lsid)
Expand Down Expand Up @@ -264,7 +281,7 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {
require.Equal(t, varlogpb.LogStreamStatusRunning, lsd.Status)
require.Len(t, lsd.Replicas, replicationFactor)

tc.testf(t, vlg, td.TopicID, lsd.LogStreamID)
tc.testf(t, adm, vlg, td.TopicID, lsd.LogStreamID)
})
}
}
Expand Down

0 comments on commit 9d0b203

Please sign in to comment.