From 1b8f5a0123be567305ea7159391eac474baf2477 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Sat, 24 Feb 2024 21:39:32 +0900 Subject: [PATCH] perf(client): reuse snpb.SubscribeResponse in RPC handler This pull request optimizes `internal/storagenode/client.(*LogClient).Subscribe`. It reuses `snpb.SubscribeResponse` to reduce heap allocations. Note that it does not reuse the byte slice in the `snpb.SubscribeResponse`. It only reuses the struct `snpb.SubscribeResponse` itself. --- internal/storagenode/client/log_client.go | 4 +++- .../storagenode/client/log_client_test.go | 23 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/internal/storagenode/client/log_client.go b/internal/storagenode/client/log_client.go index ebfa87e0c..730b7d5ec 100644 --- a/internal/storagenode/client/log_client.go +++ b/internal/storagenode/client/log_client.go @@ -97,8 +97,10 @@ func (c *LogClient) Subscribe(ctx context.Context, tpid types.TopicID, lsid type defer func() { close(out) }() + var rsp snpb.SubscribeResponse for { - rsp, rpcErr := stream.Recv() + rsp.Reset() + rpcErr := stream.RecvMsg(&rsp) err := verrors.FromStatusError(rpcErr) result := SubscribeResult{Error: err} if err == nil { diff --git a/internal/storagenode/client/log_client_test.go b/internal/storagenode/client/log_client_test.go index a4d30e440..5eed92f82 100644 --- a/internal/storagenode/client/log_client_test.go +++ b/internal/storagenode/client/log_client_test.go @@ -104,6 +104,29 @@ func newMockStorageNodeServiceClient(ctrl *gomock.Controller, sn *storageNode, t ).DoAndReturn(func(_ context.Context, req *snpb.SubscribeRequest, opts ...grpc.CallOption) (snpb.LogIO_SubscribeClient, error) { nextGLSN := req.GetGLSNBegin() stream := mock.NewMockLogIO_SubscribeClient(ctrl) + stream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn( + func(m any) error { + sn.mu.Lock() + defer sn.mu.Unlock() + var glsns []types.GLSN + for glsn := range sn.logEntries { + glsns = append(glsns, glsn) + } + sort.Sort(byGLSN(glsns)) + for _, glsn := range glsns { + if glsn < nextGLSN { + continue + } + nextGLSN = glsn + 1 + rsp := m.(*snpb.SubscribeResponse) + rsp.GLSN = glsn + rsp.LLSN = sn.glsnToLLSN[glsn] + rsp.Payload = sn.logEntries[glsn] + return nil + } + return io.EOF + }, + ).AnyTimes() stream.EXPECT().Recv().DoAndReturn( func() (*snpb.SubscribeResponse, error) { sn.mu.Lock()