Skip to content

Commit

Permalink
perf(client): reuse snpb.SubscribeResponse in RPC handler (#719)
Browse files Browse the repository at this point in the history
### What this PR does

This pull request optimizes
`internal/storagenode/client.(*LogClient).Subscribe`. It reuses
`snpb.SubscribeResponse` to reduce heap allocations. Note that it does not reuse
the byte slice in the `snpb.SubscribeResponse`. It only reuses the struct
`snpb.SubscribeResponse` itself.
  • Loading branch information
ijsong authored Mar 5, 2024
2 parents fbf0c17 + 890508d commit e98ac54
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
4 changes: 3 additions & 1 deletion internal/storagenode/client/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ func (c *LogClient) Subscribe(ctx context.Context, tpid types.TopicID, lsid type
defer func() {
close(out)
}()
var rsp snpb.SubscribeResponse
for {
rsp, rpcErr := stream.Recv()
rsp.Reset()
rpcErr := stream.RecvMsg(&rsp)
err := verrors.FromStatusError(rpcErr)
result := SubscribeResult{Error: err}
if err == nil {
Expand Down
23 changes: 23 additions & 0 deletions internal/storagenode/client/log_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,29 @@ func newMockStorageNodeServiceClient(ctrl *gomock.Controller, sn *storageNode, t
).DoAndReturn(func(_ context.Context, req *snpb.SubscribeRequest, opts ...grpc.CallOption) (snpb.LogIO_SubscribeClient, error) {
nextGLSN := req.GetGLSNBegin()
stream := mock.NewMockLogIO_SubscribeClient(ctrl)
stream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(
func(m any) error {
sn.mu.Lock()
defer sn.mu.Unlock()
var glsns []types.GLSN
for glsn := range sn.logEntries {
glsns = append(glsns, glsn)
}
sort.Sort(byGLSN(glsns))
for _, glsn := range glsns {
if glsn < nextGLSN {
continue
}
nextGLSN = glsn + 1
rsp := m.(*snpb.SubscribeResponse)
rsp.GLSN = glsn
rsp.LLSN = sn.glsnToLLSN[glsn]
rsp.Payload = sn.logEntries[glsn]
return nil
}
return io.EOF
},
).AnyTimes()
stream.EXPECT().Recv().DoAndReturn(
func() (*snpb.SubscribeResponse, error) {
sn.mu.Lock()
Expand Down

0 comments on commit e98ac54

Please sign in to comment.