From 8ef188664757e32ee89e12f93039fc8989251969 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Wed, 5 Jun 2024 11:26:44 +0900 Subject: [PATCH] perf(sn): reuse buffer for ReplicateRequest unmarshaling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improve unmarshaling performance by reusing buffers for ReplicateRequest in the backup replica. The protobuf message `github.com/kakao/varlog/proto/snpb.(ReplicateRequest)` has two slice fields—LLSN (`[]uint64`) and Data (`[][]byte`). The backup replica receives replicated log entries from the primary replica via the gRPC service `github.com/kakao/varlog/proto/snpb.(ReplicatorServer).Replicate`, which sends `ReplicateRequest` messages. Upon receiving a `ReplicateRequest`, the backup replica unmarshals the message, which involves growing slices for fields such as LLSN and Data. This growth causes copy overhead whenever the slice capacities need to expand. To address this, we introduce a new method, `ResetReuse`, for reusing slices instead of resetting them completely. The `ResetReuse` method shrinks the slice lengths while preserving their capacities, thus avoiding the overhead of reallocating memory. Example implementation: ```go type Message struct { Buffer []byte // Other fields } func (m *Message) Reset() { *m = Message{} } func (m *Message) ResetReuse() { s := m.Buffer[:0] *m = Message{} m.Buffer = s } ``` Risks: This approach has potential downsides. Since the heap space consumed by the slices is not reclaimed, the storage node's memory consumption may increase. Currently, there is no mechanism to shrink the heap usage. Additionally, this PR changes the generated code. The protobuf compiler can revert it, which is contrary to our intention. To catch this mistake, this PR includes a unit test (github.com/kakao/varlog/proto/snpb.TestReplicateRequest) to verify that the buffer backing the slices is reused. Resolves: #795 See also: #806 --- Makefile | 2 + internal/storagenode/replication_server.go | 16 +- proto/patches/snpb_replicator.patch | 545 +++++++++++++++++++++ proto/snpb/replicator.go | 11 + proto/snpb/replicator.pb.go | 109 ++++- proto/snpb/replicator_test.go | 169 +++++++ 6 files changed, 830 insertions(+), 22 deletions(-) create mode 100644 proto/patches/snpb_replicator.patch diff --git a/Makefile b/Makefile index 645a604a7..e66a6a736 100644 --- a/Makefile +++ b/Makefile @@ -114,6 +114,8 @@ $(PROTO_PBS): $(PROTO_SRCS) $(PROTOC) $(PROTO_INCS) \ --gogo_out=plugins=grpc,Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,paths=source_relative:. $$src ; \ done + $(MAKE) fmt + git apply -v proto/patches/*.patch proto-check: $(MAKE) proto diff --git a/internal/storagenode/replication_server.go b/internal/storagenode/replication_server.go index dfa9c776a..49b9f353e 100644 --- a/internal/storagenode/replication_server.go +++ b/internal/storagenode/replication_server.go @@ -93,15 +93,12 @@ type replicationServerTask struct { err error } -func newReplicationServerTask(req snpb.ReplicateRequest, err error) *replicationServerTask { - rst := replicationServerTaskPool.Get().(*replicationServerTask) - rst.req = req - rst.err = err - return rst +func newReplicationServerTask() *replicationServerTask { + return replicationServerTaskPool.Get().(*replicationServerTask) } func (rst *replicationServerTask) release() { - rst.req = snpb.ReplicateRequest{} + rst.req.ResetReuse() rst.err = nil replicationServerTaskPool.Put(rst) } @@ -113,11 +110,10 @@ func (rs *replicationServer) recv(ctx context.Context, stream snpb.Replicator_Re go func() { defer wg.Done() defer close(c) - req := &snpb.ReplicateRequest{} for { - req.Reset() - err := stream.RecvMsg(req) - rst := newReplicationServerTask(*req, err) + rst := newReplicationServerTask() + err := stream.RecvMsg(&rst.req) + rst.err = err select { case c <- rst: if err != nil { diff --git a/proto/patches/snpb_replicator.patch b/proto/patches/snpb_replicator.patch new file mode 100644 index 000000000..a87409c4f --- /dev/null +++ b/proto/patches/snpb_replicator.patch @@ -0,0 +1,545 @@ +diff --git a/proto/snpb/replicator.pb.go b/proto/snpb/replicator.pb.go +index 139a31eb..e5105c63 100644 +--- a/proto/snpb/replicator.pb.go ++++ b/proto/snpb/replicator.pb.go +@@ -10,6 +10,7 @@ import ( + io "io" + math "math" + math_bits "math/bits" ++ "slices" + strconv "strconv" + + _ "github.com/gogo/protobuf/gogoproto" +@@ -24,9 +25,11 @@ import ( + ) + + // Reference imports to suppress errors if they are not otherwise used. +-var _ = proto.Marshal +-var _ = fmt.Errorf +-var _ = math.Inf ++var ( ++ _ = proto.Marshal ++ _ = fmt.Errorf ++ _ = math.Inf ++) + + // This is a compile-time assertion to ensure that this generated file + // is compatible with the proto package it is being compiled against. +@@ -80,9 +83,11 @@ func (*ReplicateRequest) ProtoMessage() {} + func (*ReplicateRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{0} + } ++ + func (m *ReplicateRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *ReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReplicateRequest.Marshal(b, m, deterministic) +@@ -95,12 +100,15 @@ func (m *ReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, er + return b[:n], nil + } + } ++ + func (m *ReplicateRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReplicateRequest.Merge(m, src) + } ++ + func (m *ReplicateRequest) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *ReplicateRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ReplicateRequest.DiscardUnknown(m) + } +@@ -135,8 +143,7 @@ func (m *ReplicateRequest) GetData() [][]byte { + return nil + } + +-type ReplicateResponse struct { +-} ++type ReplicateResponse struct{} + + func (m *ReplicateResponse) Reset() { *m = ReplicateResponse{} } + func (m *ReplicateResponse) String() string { return proto.CompactTextString(m) } +@@ -144,9 +151,11 @@ func (*ReplicateResponse) ProtoMessage() {} + func (*ReplicateResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{1} + } ++ + func (m *ReplicateResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *ReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReplicateResponse.Marshal(b, m, deterministic) +@@ -159,12 +168,15 @@ func (m *ReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, e + return b[:n], nil + } + } ++ + func (m *ReplicateResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReplicateResponse.Merge(m, src) + } ++ + func (m *ReplicateResponse) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *ReplicateResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ReplicateResponse.DiscardUnknown(m) + } +@@ -182,9 +194,11 @@ func (*SyncPosition) ProtoMessage() {} + func (*SyncPosition) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{2} + } ++ + func (m *SyncPosition) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncPosition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncPosition.Marshal(b, m, deterministic) +@@ -197,12 +211,15 @@ func (m *SyncPosition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) + return b[:n], nil + } + } ++ + func (m *SyncPosition) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncPosition.Merge(m, src) + } ++ + func (m *SyncPosition) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncPosition) XXX_DiscardUnknown() { + xxx_messageInfo_SyncPosition.DiscardUnknown(m) + } +@@ -239,9 +256,11 @@ func (*SyncRange) ProtoMessage() {} + func (*SyncRange) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{3} + } ++ + func (m *SyncRange) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncRange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncRange.Marshal(b, m, deterministic) +@@ -254,12 +273,15 @@ func (m *SyncRange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return b[:n], nil + } + } ++ + func (m *SyncRange) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncRange.Merge(m, src) + } ++ + func (m *SyncRange) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncRange) XXX_DiscardUnknown() { + xxx_messageInfo_SyncRange.DiscardUnknown(m) + } +@@ -305,9 +327,11 @@ func (*SyncInitRequest) ProtoMessage() {} + func (*SyncInitRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{4} + } ++ + func (m *SyncInitRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncInitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncInitRequest.Marshal(b, m, deterministic) +@@ -320,12 +344,15 @@ func (m *SyncInitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, err + return b[:n], nil + } + } ++ + func (m *SyncInitRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncInitRequest.Merge(m, src) + } ++ + func (m *SyncInitRequest) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncInitRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SyncInitRequest.DiscardUnknown(m) + } +@@ -381,9 +408,11 @@ func (*SyncInitResponse) ProtoMessage() {} + func (*SyncInitResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{5} + } ++ + func (m *SyncInitResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncInitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncInitResponse.Marshal(b, m, deterministic) +@@ -396,12 +425,15 @@ func (m *SyncInitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, er + return b[:n], nil + } + } ++ + func (m *SyncInitResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncInitResponse.Merge(m, src) + } ++ + func (m *SyncInitResponse) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncInitResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SyncInitResponse.DiscardUnknown(m) + } +@@ -428,9 +460,11 @@ func (*SyncStatus) ProtoMessage() {} + func (*SyncStatus) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{6} + } ++ + func (m *SyncStatus) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncStatus.Marshal(b, m, deterministic) +@@ -443,12 +477,15 @@ func (m *SyncStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return b[:n], nil + } + } ++ + func (m *SyncStatus) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncStatus.Merge(m, src) + } ++ + func (m *SyncStatus) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncStatus) XXX_DiscardUnknown() { + xxx_messageInfo_SyncStatus.DiscardUnknown(m) + } +@@ -494,9 +531,11 @@ func (*SyncPayload) ProtoMessage() {} + func (*SyncPayload) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{7} + } ++ + func (m *SyncPayload) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncPayload.Marshal(b, m, deterministic) +@@ -509,12 +548,15 @@ func (m *SyncPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) + return b[:n], nil + } + } ++ + func (m *SyncPayload) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncPayload.Merge(m, src) + } ++ + func (m *SyncPayload) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncPayload) XXX_DiscardUnknown() { + xxx_messageInfo_SyncPayload.DiscardUnknown(m) + } +@@ -548,9 +590,11 @@ func (*SyncReplicateRequest) ProtoMessage() {} + func (*SyncReplicateRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{8} + } ++ + func (m *SyncReplicateRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncReplicateRequest.Marshal(b, m, deterministic) +@@ -563,12 +607,15 @@ func (m *SyncReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte + return b[:n], nil + } + } ++ + func (m *SyncReplicateRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncReplicateRequest.Merge(m, src) + } ++ + func (m *SyncReplicateRequest) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncReplicateRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SyncReplicateRequest.DiscardUnknown(m) + } +@@ -613,9 +660,11 @@ func (*SyncReplicateResponse) ProtoMessage() {} + func (*SyncReplicateResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{9} + } ++ + func (m *SyncReplicateResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncReplicateResponse.Marshal(b, m, deterministic) +@@ -628,12 +677,15 @@ func (m *SyncReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byt + return b[:n], nil + } + } ++ + func (m *SyncReplicateResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncReplicateResponse.Merge(m, src) + } ++ + func (m *SyncReplicateResponse) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncReplicateResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SyncReplicateResponse.DiscardUnknown(m) + } +@@ -738,6 +790,7 @@ func (x SyncState) String() string { + } + return strconv.Itoa(int(x)) + } ++ + func (this *ReplicateRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil +@@ -781,6 +834,7 @@ func (this *ReplicateRequest) Equal(that interface{}) bool { + } + return true + } ++ + func (this *SyncPosition) Equal(that interface{}) bool { + if that == nil { + return this == nil +@@ -810,8 +864,10 @@ func (this *SyncPosition) Equal(that interface{}) bool { + } + + // Reference imports to suppress errors if they are not otherwise used. +-var _ context.Context +-var _ grpc.ClientConn ++var ( ++ _ context.Context ++ _ grpc.ClientConn ++) + + // This is a compile-time assertion to ensure that this generated file + // is compatible with the grpc package it is being compiled against. +@@ -1022,18 +1078,20 @@ type ReplicatorServer interface { + } + + // UnimplementedReplicatorServer can be embedded to have forward compatible implementations. +-type UnimplementedReplicatorServer struct { +-} ++type UnimplementedReplicatorServer struct{} + + func (*UnimplementedReplicatorServer) Replicate(srv Replicator_ReplicateServer) error { + return status.Errorf(codes.Unimplemented, "method Replicate not implemented") + } ++ + func (*UnimplementedReplicatorServer) SyncInit(ctx context.Context, req *SyncInitRequest) (*SyncInitResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SyncInit not implemented") + } ++ + func (*UnimplementedReplicatorServer) SyncReplicate(ctx context.Context, req *SyncReplicateRequest) (*SyncReplicateResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SyncReplicate not implemented") + } ++ + func (*UnimplementedReplicatorServer) SyncReplicateStream(srv Replicator_SyncReplicateStreamServer) error { + return status.Errorf(codes.Unimplemented, "method SyncReplicateStream not implemented") + } +@@ -1612,6 +1670,7 @@ func encodeVarintReplicator(dAtA []byte, offset int, v uint64) int { + dAtA[offset] = uint8(v) + return base + } ++ + func NewPopulatedReplicateRequest(r randyReplicator, easy bool) *ReplicateRequest { + this := &ReplicateRequest{} + this.TopicID = github_com_kakao_varlog_pkg_types.TopicID(r.Int31()) +@@ -1659,6 +1718,7 @@ func randUTF8RuneReplicator(r randyReplicator) rune { + } + return rune(ru + 61) + } ++ + func randStringReplicator(r randyReplicator) string { + v4 := r.Intn(100) + tmps := make([]rune, v4) +@@ -1667,6 +1727,7 @@ func randStringReplicator(r randyReplicator) string { + } + return string(tmps) + } ++ + func randUnrecognizedReplicator(r randyReplicator, maxFieldNumber int) (dAtA []byte) { + l := r.Intn(5) + for i := 0; i < l; i++ { +@@ -1679,6 +1740,7 @@ func randUnrecognizedReplicator(r randyReplicator, maxFieldNumber int) (dAtA []b + } + return dAtA + } ++ + func randFieldReplicator(dAtA []byte, r randyReplicator, fieldNumber int, wire int) []byte { + key := uint32(fieldNumber)<<3 | uint32(wire) + switch wire { +@@ -1705,6 +1767,7 @@ func randFieldReplicator(dAtA []byte, r randyReplicator, fieldNumber int, wire i + } + return dAtA + } ++ + func encodeVarintPopulateReplicator(dAtA []byte, v uint64) []byte { + for v >= 1<<7 { + dAtA = append(dAtA, uint8(uint64(v)&0x7f|0x80)) +@@ -1713,6 +1776,7 @@ func encodeVarintPopulateReplicator(dAtA []byte, v uint64) []byte { + dAtA = append(dAtA, uint8(v)) + return dAtA + } ++ + func (m *ReplicateRequest) ProtoSize() (n int) { + if m == nil { + return 0 +@@ -1881,9 +1945,11 @@ func (m *SyncReplicateResponse) ProtoSize() (n int) { + func sovReplicator(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 + } ++ + func sozReplicator(x uint64) (n int) { + return sovReplicator(uint64((x << 1) ^ uint64((int64(x) >> 63)))) + } ++ + func (this *SyncPayload) GetValue() interface{} { + if this.CommitContext != nil { + return this.CommitContext +@@ -1905,6 +1971,7 @@ func (this *SyncPayload) SetValue(value interface{}) bool { + } + return true + } ++ + func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2025,7 +2092,8 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { + } + elementCount = count + if elementCount != 0 && len(m.LLSN) == 0 { +- m.LLSN = make([]github_com_kakao_varlog_pkg_types.LLSN, 0, elementCount) ++ m.LLSN = slices.Grow(m.LLSN, elementCount) ++ m.LLSN = m.LLSN[:0] + } + for iNdEx < postIndex { + var v github_com_kakao_varlog_pkg_types.LLSN +@@ -2077,8 +2145,15 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { + if postIndex > l { + return io.ErrUnexpectedEOF + } +- m.Data = append(m.Data, make([]byte, postIndex-iNdEx)) +- copy(m.Data[len(m.Data)-1], dAtA[iNdEx:postIndex]) ++ if len(m.Data) < cap(m.Data) { ++ m.Data = m.Data[:len(m.Data)+1] ++ } else { ++ m.Data = append(m.Data, nil) ++ } ++ lastIndex := len(m.Data) - 1 ++ m.Data[lastIndex] = slices.Grow(m.Data[lastIndex], postIndex-iNdEx) ++ m.Data[lastIndex] = m.Data[lastIndex][:postIndex-iNdEx] ++ copy(m.Data[lastIndex], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex +@@ -2101,6 +2176,7 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *ReplicateResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2151,6 +2227,7 @@ func (m *ReplicateResponse) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncPosition) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2239,6 +2316,7 @@ func (m *SyncPosition) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncRange) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2327,6 +2405,7 @@ func (m *SyncRange) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncInitRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2514,6 +2593,7 @@ func (m *SyncInitRequest) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncInitResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2597,6 +2677,7 @@ func (m *SyncInitResponse) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncStatus) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2765,6 +2846,7 @@ func (m *SyncStatus) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncPayload) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2887,6 +2969,7 @@ func (m *SyncPayload) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncReplicateRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -3055,6 +3138,7 @@ func (m *SyncReplicateRequest) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncReplicateResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -3141,6 +3225,7 @@ func (m *SyncReplicateResponse) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func skipReplicator(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 diff --git a/proto/snpb/replicator.go b/proto/snpb/replicator.go index 6d9800cf8..2c28ca8f6 100644 --- a/proto/snpb/replicator.go +++ b/proto/snpb/replicator.go @@ -6,6 +6,17 @@ import ( "github.com/kakao/varlog/pkg/types" ) +func (m *ReplicateRequest) ResetReuse() { + llsnSlice := m.LLSN[:0] + for i := range m.Data { + m.Data[i] = m.Data[i][:0] + } + dataSlice := m.Data[:0] + m.Reset() + m.LLSN = llsnSlice + m.Data = dataSlice +} + func InvalidSyncPosition() SyncPosition { return SyncPosition{LLSN: types.InvalidLLSN, GLSN: types.InvalidGLSN} } diff --git a/proto/snpb/replicator.pb.go b/proto/snpb/replicator.pb.go index 139a31eb6..e5105c631 100644 --- a/proto/snpb/replicator.pb.go +++ b/proto/snpb/replicator.pb.go @@ -10,6 +10,7 @@ import ( io "io" math "math" math_bits "math/bits" + "slices" strconv "strconv" _ "github.com/gogo/protobuf/gogoproto" @@ -24,9 +25,11 @@ import ( ) // Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf +var ( + _ = proto.Marshal + _ = fmt.Errorf + _ = math.Inf +) // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. @@ -80,9 +83,11 @@ func (*ReplicateRequest) ProtoMessage() {} func (*ReplicateRequest) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{0} } + func (m *ReplicateRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *ReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_ReplicateRequest.Marshal(b, m, deterministic) @@ -95,12 +100,15 @@ func (m *ReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, er return b[:n], nil } } + func (m *ReplicateRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_ReplicateRequest.Merge(m, src) } + func (m *ReplicateRequest) XXX_Size() int { return m.ProtoSize() } + func (m *ReplicateRequest) XXX_DiscardUnknown() { xxx_messageInfo_ReplicateRequest.DiscardUnknown(m) } @@ -135,8 +143,7 @@ func (m *ReplicateRequest) GetData() [][]byte { return nil } -type ReplicateResponse struct { -} +type ReplicateResponse struct{} func (m *ReplicateResponse) Reset() { *m = ReplicateResponse{} } func (m *ReplicateResponse) String() string { return proto.CompactTextString(m) } @@ -144,9 +151,11 @@ func (*ReplicateResponse) ProtoMessage() {} func (*ReplicateResponse) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{1} } + func (m *ReplicateResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *ReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_ReplicateResponse.Marshal(b, m, deterministic) @@ -159,12 +168,15 @@ func (m *ReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, e return b[:n], nil } } + func (m *ReplicateResponse) XXX_Merge(src proto.Message) { xxx_messageInfo_ReplicateResponse.Merge(m, src) } + func (m *ReplicateResponse) XXX_Size() int { return m.ProtoSize() } + func (m *ReplicateResponse) XXX_DiscardUnknown() { xxx_messageInfo_ReplicateResponse.DiscardUnknown(m) } @@ -182,9 +194,11 @@ func (*SyncPosition) ProtoMessage() {} func (*SyncPosition) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{2} } + func (m *SyncPosition) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncPosition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncPosition.Marshal(b, m, deterministic) @@ -197,12 +211,15 @@ func (m *SyncPosition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } + func (m *SyncPosition) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncPosition.Merge(m, src) } + func (m *SyncPosition) XXX_Size() int { return m.ProtoSize() } + func (m *SyncPosition) XXX_DiscardUnknown() { xxx_messageInfo_SyncPosition.DiscardUnknown(m) } @@ -239,9 +256,11 @@ func (*SyncRange) ProtoMessage() {} func (*SyncRange) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{3} } + func (m *SyncRange) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncRange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncRange.Marshal(b, m, deterministic) @@ -254,12 +273,15 @@ func (m *SyncRange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } + func (m *SyncRange) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncRange.Merge(m, src) } + func (m *SyncRange) XXX_Size() int { return m.ProtoSize() } + func (m *SyncRange) XXX_DiscardUnknown() { xxx_messageInfo_SyncRange.DiscardUnknown(m) } @@ -305,9 +327,11 @@ func (*SyncInitRequest) ProtoMessage() {} func (*SyncInitRequest) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{4} } + func (m *SyncInitRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncInitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncInitRequest.Marshal(b, m, deterministic) @@ -320,12 +344,15 @@ func (m *SyncInitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, err return b[:n], nil } } + func (m *SyncInitRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncInitRequest.Merge(m, src) } + func (m *SyncInitRequest) XXX_Size() int { return m.ProtoSize() } + func (m *SyncInitRequest) XXX_DiscardUnknown() { xxx_messageInfo_SyncInitRequest.DiscardUnknown(m) } @@ -381,9 +408,11 @@ func (*SyncInitResponse) ProtoMessage() {} func (*SyncInitResponse) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{5} } + func (m *SyncInitResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncInitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncInitResponse.Marshal(b, m, deterministic) @@ -396,12 +425,15 @@ func (m *SyncInitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, er return b[:n], nil } } + func (m *SyncInitResponse) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncInitResponse.Merge(m, src) } + func (m *SyncInitResponse) XXX_Size() int { return m.ProtoSize() } + func (m *SyncInitResponse) XXX_DiscardUnknown() { xxx_messageInfo_SyncInitResponse.DiscardUnknown(m) } @@ -428,9 +460,11 @@ func (*SyncStatus) ProtoMessage() {} func (*SyncStatus) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{6} } + func (m *SyncStatus) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncStatus.Marshal(b, m, deterministic) @@ -443,12 +477,15 @@ func (m *SyncStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } + func (m *SyncStatus) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncStatus.Merge(m, src) } + func (m *SyncStatus) XXX_Size() int { return m.ProtoSize() } + func (m *SyncStatus) XXX_DiscardUnknown() { xxx_messageInfo_SyncStatus.DiscardUnknown(m) } @@ -494,9 +531,11 @@ func (*SyncPayload) ProtoMessage() {} func (*SyncPayload) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{7} } + func (m *SyncPayload) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncPayload.Marshal(b, m, deterministic) @@ -509,12 +548,15 @@ func (m *SyncPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } + func (m *SyncPayload) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncPayload.Merge(m, src) } + func (m *SyncPayload) XXX_Size() int { return m.ProtoSize() } + func (m *SyncPayload) XXX_DiscardUnknown() { xxx_messageInfo_SyncPayload.DiscardUnknown(m) } @@ -548,9 +590,11 @@ func (*SyncReplicateRequest) ProtoMessage() {} func (*SyncReplicateRequest) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{8} } + func (m *SyncReplicateRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncReplicateRequest.Marshal(b, m, deterministic) @@ -563,12 +607,15 @@ func (m *SyncReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte return b[:n], nil } } + func (m *SyncReplicateRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncReplicateRequest.Merge(m, src) } + func (m *SyncReplicateRequest) XXX_Size() int { return m.ProtoSize() } + func (m *SyncReplicateRequest) XXX_DiscardUnknown() { xxx_messageInfo_SyncReplicateRequest.DiscardUnknown(m) } @@ -613,9 +660,11 @@ func (*SyncReplicateResponse) ProtoMessage() {} func (*SyncReplicateResponse) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{9} } + func (m *SyncReplicateResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncReplicateResponse.Marshal(b, m, deterministic) @@ -628,12 +677,15 @@ func (m *SyncReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byt return b[:n], nil } } + func (m *SyncReplicateResponse) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncReplicateResponse.Merge(m, src) } + func (m *SyncReplicateResponse) XXX_Size() int { return m.ProtoSize() } + func (m *SyncReplicateResponse) XXX_DiscardUnknown() { xxx_messageInfo_SyncReplicateResponse.DiscardUnknown(m) } @@ -738,6 +790,7 @@ func (x SyncState) String() string { } return strconv.Itoa(int(x)) } + func (this *ReplicateRequest) Equal(that interface{}) bool { if that == nil { return this == nil @@ -781,6 +834,7 @@ func (this *ReplicateRequest) Equal(that interface{}) bool { } return true } + func (this *SyncPosition) Equal(that interface{}) bool { if that == nil { return this == nil @@ -810,8 +864,10 @@ func (this *SyncPosition) Equal(that interface{}) bool { } // Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn +var ( + _ context.Context + _ grpc.ClientConn +) // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. @@ -1022,18 +1078,20 @@ type ReplicatorServer interface { } // UnimplementedReplicatorServer can be embedded to have forward compatible implementations. -type UnimplementedReplicatorServer struct { -} +type UnimplementedReplicatorServer struct{} func (*UnimplementedReplicatorServer) Replicate(srv Replicator_ReplicateServer) error { return status.Errorf(codes.Unimplemented, "method Replicate not implemented") } + func (*UnimplementedReplicatorServer) SyncInit(ctx context.Context, req *SyncInitRequest) (*SyncInitResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SyncInit not implemented") } + func (*UnimplementedReplicatorServer) SyncReplicate(ctx context.Context, req *SyncReplicateRequest) (*SyncReplicateResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SyncReplicate not implemented") } + func (*UnimplementedReplicatorServer) SyncReplicateStream(srv Replicator_SyncReplicateStreamServer) error { return status.Errorf(codes.Unimplemented, "method SyncReplicateStream not implemented") } @@ -1612,6 +1670,7 @@ func encodeVarintReplicator(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } + func NewPopulatedReplicateRequest(r randyReplicator, easy bool) *ReplicateRequest { this := &ReplicateRequest{} this.TopicID = github_com_kakao_varlog_pkg_types.TopicID(r.Int31()) @@ -1659,6 +1718,7 @@ func randUTF8RuneReplicator(r randyReplicator) rune { } return rune(ru + 61) } + func randStringReplicator(r randyReplicator) string { v4 := r.Intn(100) tmps := make([]rune, v4) @@ -1667,6 +1727,7 @@ func randStringReplicator(r randyReplicator) string { } return string(tmps) } + func randUnrecognizedReplicator(r randyReplicator, maxFieldNumber int) (dAtA []byte) { l := r.Intn(5) for i := 0; i < l; i++ { @@ -1679,6 +1740,7 @@ func randUnrecognizedReplicator(r randyReplicator, maxFieldNumber int) (dAtA []b } return dAtA } + func randFieldReplicator(dAtA []byte, r randyReplicator, fieldNumber int, wire int) []byte { key := uint32(fieldNumber)<<3 | uint32(wire) switch wire { @@ -1705,6 +1767,7 @@ func randFieldReplicator(dAtA []byte, r randyReplicator, fieldNumber int, wire i } return dAtA } + func encodeVarintPopulateReplicator(dAtA []byte, v uint64) []byte { for v >= 1<<7 { dAtA = append(dAtA, uint8(uint64(v)&0x7f|0x80)) @@ -1713,6 +1776,7 @@ func encodeVarintPopulateReplicator(dAtA []byte, v uint64) []byte { dAtA = append(dAtA, uint8(v)) return dAtA } + func (m *ReplicateRequest) ProtoSize() (n int) { if m == nil { return 0 @@ -1881,9 +1945,11 @@ func (m *SyncReplicateResponse) ProtoSize() (n int) { func sovReplicator(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } + func sozReplicator(x uint64) (n int) { return sovReplicator(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } + func (this *SyncPayload) GetValue() interface{} { if this.CommitContext != nil { return this.CommitContext @@ -1905,6 +1971,7 @@ func (this *SyncPayload) SetValue(value interface{}) bool { } return true } + func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2025,7 +2092,8 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { } elementCount = count if elementCount != 0 && len(m.LLSN) == 0 { - m.LLSN = make([]github_com_kakao_varlog_pkg_types.LLSN, 0, elementCount) + m.LLSN = slices.Grow(m.LLSN, elementCount) + m.LLSN = m.LLSN[:0] } for iNdEx < postIndex { var v github_com_kakao_varlog_pkg_types.LLSN @@ -2077,8 +2145,15 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Data = append(m.Data, make([]byte, postIndex-iNdEx)) - copy(m.Data[len(m.Data)-1], dAtA[iNdEx:postIndex]) + if len(m.Data) < cap(m.Data) { + m.Data = m.Data[:len(m.Data)+1] + } else { + m.Data = append(m.Data, nil) + } + lastIndex := len(m.Data) - 1 + m.Data[lastIndex] = slices.Grow(m.Data[lastIndex], postIndex-iNdEx) + m.Data[lastIndex] = m.Data[lastIndex][:postIndex-iNdEx] + copy(m.Data[lastIndex], dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex @@ -2101,6 +2176,7 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { } return nil } + func (m *ReplicateResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2151,6 +2227,7 @@ func (m *ReplicateResponse) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncPosition) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2239,6 +2316,7 @@ func (m *SyncPosition) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncRange) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2327,6 +2405,7 @@ func (m *SyncRange) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncInitRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2514,6 +2593,7 @@ func (m *SyncInitRequest) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncInitResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2597,6 +2677,7 @@ func (m *SyncInitResponse) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncStatus) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2765,6 +2846,7 @@ func (m *SyncStatus) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncPayload) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2887,6 +2969,7 @@ func (m *SyncPayload) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncReplicateRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -3055,6 +3138,7 @@ func (m *SyncReplicateRequest) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncReplicateResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -3141,6 +3225,7 @@ func (m *SyncReplicateResponse) Unmarshal(dAtA []byte) error { } return nil } + func skipReplicator(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/proto/snpb/replicator_test.go b/proto/snpb/replicator_test.go index 437c06d5d..d506eeaeb 100644 --- a/proto/snpb/replicator_test.go +++ b/proto/snpb/replicator_test.go @@ -2,10 +2,179 @@ package snpb import ( "testing" + "unsafe" "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/pkg/types" ) +func getPointers[S ~[]E, E any](elems S) []uintptr { + ptrs := make([]uintptr, len(elems)) + for i := range elems { + p := uintptr(unsafe.Pointer(&elems[i])) + ptrs[i] = p + } + return ptrs +} + +func get2DPointers[S ~[][]E, E any](elems S) [][]uintptr { + ptrs := make([][]uintptr, len(elems)) + for i := range elems { + ptrs[i] = getPointers(elems[i]) + } + return ptrs +} + +func TestReplicateRequest(t *testing.T) { + smallRequest := ReplicateRequest{ + LLSN: []types.LLSN{1}, + Data: [][]byte{[]byte("I")}, + } + smallPayload, err := smallRequest.Marshal() + require.NoError(t, err) + + mediumRequest := ReplicateRequest{ + LLSN: []types.LLSN{11, 12, 13, 14, 15}, + Data: [][]byte{[]byte("XI"), []byte("XII"), []byte("XIII"), []byte("XIV"), []byte("XV")}, + } + mediumPayload, err := mediumRequest.Marshal() + require.NoError(t, err) + + largeRequest := ReplicateRequest{ + LLSN: []types.LLSN{21, 22, 23, 24, 25, 26, 27, 28, 29, 30}, + Data: [][]byte{[]byte("XXI"), []byte("XXII"), []byte("XXIII"), []byte("XXIV"), []byte("XXV"), []byte("XXVI"), []byte("XXVII"), []byte("XXVIII"), []byte("XXIX"), []byte("XXX")}, + } + laregePayload, err := largeRequest.Marshal() + require.NoError(t, err) + + t.Run("EqualLength", func(t *testing.T) { + tcs := []struct { + name string + req ReplicateRequest + payload []byte + }{ + { + name: "Small", + req: smallRequest, + payload: smallPayload, + }, + { + name: "Medium", + req: mediumRequest, + payload: mediumPayload, + }, + { + name: "Large", + req: largeRequest, + payload: laregePayload, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + var req ReplicateRequest + + err := req.Unmarshal(tc.payload) + require.NoError(t, err) + require.Equal(t, tc.req, req) + wantLLSNPtrs := getPointers(req.LLSN) + wantDataPtrs := get2DPointers(req.Data) + + req.ResetReuse() + err = req.Unmarshal(tc.payload) + require.NoError(t, err) + require.Equal(t, tc.req, req) + gotLLSNPtrs := getPointers(req.LLSN) + gotDataPtrs := get2DPointers(req.Data) + + require.Equal(t, wantLLSNPtrs, gotLLSNPtrs) + require.Equal(t, wantDataPtrs, gotDataPtrs) + }) + } + }) + + t.Run("GrowingLengthGrowingCapacity", func(t *testing.T) { + var req ReplicateRequest + + err := req.Unmarshal(smallPayload) + require.NoError(t, err) + require.Equal(t, smallRequest, req) + smallLLSNPtrs := getPointers(req.LLSN) + smallDataPtrs := get2DPointers(req.Data) + smallLLSNCap := cap(req.LLSN) + smallDataCap := cap(req.Data) + + req.ResetReuse() + err = req.Unmarshal(mediumPayload) + require.NoError(t, err) + require.Equal(t, mediumRequest, req) + mediumLLSNPtrs := getPointers(req.LLSN) + mediumDataPtrs := get2DPointers(req.Data) + mediumLLSNCap := cap(req.LLSN) + mediumDataCap := cap(req.Data) + + require.NotEqual(t, smallLLSNPtrs, mediumLLSNPtrs) + require.NotEqual(t, smallDataPtrs, mediumDataPtrs) + require.Less(t, smallLLSNCap, mediumLLSNCap) + require.Less(t, smallDataCap, mediumDataCap) + + req.ResetReuse() + err = req.Unmarshal(laregePayload) + require.NoError(t, err) + require.Equal(t, largeRequest, req) + largeLLSNPtrs := getPointers(req.LLSN) + largeDataPtrs := get2DPointers(req.Data) + largeLLSNCap := cap(req.LLSN) + largeDataCap := cap(req.Data) + + require.NotEqual(t, mediumLLSNPtrs, largeLLSNPtrs) + require.NotEqual(t, mediumDataPtrs, largeDataPtrs) + require.Less(t, mediumLLSNCap, largeLLSNCap) + require.Less(t, mediumDataCap, largeDataCap) + }) + + t.Run("ShrinkingLengthEqualCapacity", func(t *testing.T) { + var req ReplicateRequest + + err := req.Unmarshal(laregePayload) + require.NoError(t, err) + require.Equal(t, largeRequest, req) + largeLLSNPtrs := getPointers(req.LLSN) + largeDataPtrs := get2DPointers(req.Data) + largeLLSNCap := cap(req.LLSN) + largeDataCap := cap(req.Data) + + req.ResetReuse() + err = req.Unmarshal(mediumPayload) + require.NoError(t, err) + require.Equal(t, mediumRequest, req) + mediumLLSNPtrs := getPointers(req.LLSN) + mediumDataPtrs := get2DPointers(req.Data) + mediumLLSNCap := cap(req.LLSN) + mediumDataCap := cap(req.Data) + + require.NotEqual(t, largeLLSNPtrs, mediumLLSNPtrs) + require.NotEqual(t, largeDataPtrs, mediumDataPtrs) + require.Equal(t, largeLLSNCap, mediumLLSNCap) + require.Equal(t, largeDataCap, mediumDataCap) + + req.ResetReuse() + err = req.Unmarshal(smallPayload) + require.NoError(t, err) + require.Equal(t, smallRequest, req) + smallLLSNPtrs := getPointers(req.LLSN) + smallDataPtrs := get2DPointers(req.Data) + smallLLSNCap := cap(req.LLSN) + smallDataCap := cap(req.Data) + + require.NotEqual(t, mediumLLSNPtrs, smallLLSNPtrs) + require.NotEqual(t, mediumDataPtrs, smallDataPtrs) + require.Equal(t, mediumLLSNCap, smallLLSNCap) + require.Equal(t, mediumDataCap, smallDataCap) + }) +} + func TestSyncPositionInvalid(t *testing.T) { tcs := []struct { input SyncPosition