Skip to content

Commit

Permalink
fix(varlogtest): log stream appender returns missing errors in the ca…
Browse files Browse the repository at this point in the history
…llback (#594)

### What this PR does

This PR fixed the log stream appender's bug in the varlogtest package. It had
the bug that the callback in the log stream appender didn't return an error.
  • Loading branch information
ijsong authored Oct 4, 2023
2 parents 27cd3bb + 9d0b203 commit d2e88ee
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/varlogtest/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (c *testLog) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamI
go func() {
defer lsa.wg.Done()
for qe := range lsa.queue.ch {
qe.callback(qe.result.Metadata, nil)
qe.callback(qe.result.Metadata, qe.result.Err)
lsa.queue.cv.L.Lock()
lsa.queue.cv.Broadcast()
lsa.queue.cv.L.Unlock()
Expand Down
27 changes: 22 additions & 5 deletions pkg/varlogtest/varlogtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,11 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {

tcs := []struct {
name string
testf func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID)
testf func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID)
}{
{
name: "Closed",
testf: func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
lsa, err := vcli.NewLogStreamAppender(tpid, lsid)
require.NoError(t, err)

Expand All @@ -167,7 +167,7 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {
},
{
name: "AppendLogs",
testf: func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
lsa, err := vcli.NewLogStreamAppender(tpid, lsid)
require.NoError(t, err)
defer lsa.Close()
Expand All @@ -181,9 +181,26 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {
}
},
},
{
name: "CouldNotAppend_Sealed",
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
_, err := vadm.Seal(context.Background(), tpid, lsid)
require.NoError(t, err)

lsa, err := vcli.NewLogStreamAppender(tpid, lsid)
require.NoError(t, err)
defer lsa.Close()

cb := func(_ []varlogpb.LogEntryMeta, err error) {
assert.ErrorIs(t, err, verrors.ErrSealed)
}
err = lsa.AppendBatch([][]byte{[]byte("foo")}, cb)
require.NoError(t, err)
},
},
{
name: "Manager",
testf: func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
mgr := mlsa.New(vcli)

_, err := mgr.Get(tpid+1, lsid)
Expand Down Expand Up @@ -264,7 +281,7 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {
require.Equal(t, varlogpb.LogStreamStatusRunning, lsd.Status)
require.Len(t, lsd.Replicas, replicationFactor)

tc.testf(t, vlg, td.TopicID, lsd.LogStreamID)
tc.testf(t, adm, vlg, td.TopicID, lsd.LogStreamID)
})
}
}
Expand Down

0 comments on commit d2e88ee

Please sign in to comment.