diff --git a/internal/storagenode/client/log_client.go b/internal/storagenode/client/log_client.go index ebfa87e0c..730b7d5ec 100644 --- a/internal/storagenode/client/log_client.go +++ b/internal/storagenode/client/log_client.go @@ -97,8 +97,10 @@ func (c *LogClient) Subscribe(ctx context.Context, tpid types.TopicID, lsid type defer func() { close(out) }() + var rsp snpb.SubscribeResponse for { - rsp, rpcErr := stream.Recv() + rsp.Reset() + rpcErr := stream.RecvMsg(&rsp) err := verrors.FromStatusError(rpcErr) result := SubscribeResult{Error: err} if err == nil { diff --git a/internal/storagenode/client/log_client_test.go b/internal/storagenode/client/log_client_test.go index a4d30e440..5eed92f82 100644 --- a/internal/storagenode/client/log_client_test.go +++ b/internal/storagenode/client/log_client_test.go @@ -104,6 +104,29 @@ func newMockStorageNodeServiceClient(ctrl *gomock.Controller, sn *storageNode, t ).DoAndReturn(func(_ context.Context, req *snpb.SubscribeRequest, opts ...grpc.CallOption) (snpb.LogIO_SubscribeClient, error) { nextGLSN := req.GetGLSNBegin() stream := mock.NewMockLogIO_SubscribeClient(ctrl) + stream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn( + func(m any) error { + sn.mu.Lock() + defer sn.mu.Unlock() + var glsns []types.GLSN + for glsn := range sn.logEntries { + glsns = append(glsns, glsn) + } + sort.Sort(byGLSN(glsns)) + for _, glsn := range glsns { + if glsn < nextGLSN { + continue + } + nextGLSN = glsn + 1 + rsp := m.(*snpb.SubscribeResponse) + rsp.GLSN = glsn + rsp.LLSN = sn.glsnToLLSN[glsn] + rsp.Payload = sn.logEntries[glsn] + return nil + } + return io.EOF + }, + ).AnyTimes() stream.EXPECT().Recv().DoAndReturn( func() (*snpb.SubscribeResponse, error) { sn.mu.Lock()