diff --git a/proto/snpb/metadata.go b/proto/snpb/metadata.go index 3e89df940..cd8710708 100644 --- a/proto/snpb/metadata.go +++ b/proto/snpb/metadata.go @@ -5,6 +5,9 @@ import ( "github.com/kakao/varlog/proto/varlogpb" ) +// ToStorageNodeDescriptor converts a StorageNodeMetadataDescriptor to a +// varlogpb.StorageNodeDescriptor. It returns nil if the +// StorageNodeMetadataDescriptor is nil. func (snmd *StorageNodeMetadataDescriptor) ToStorageNodeDescriptor() *varlogpb.StorageNodeDescriptor { if snmd == nil { return nil @@ -19,6 +22,9 @@ func (snmd *StorageNodeMetadataDescriptor) ToStorageNodeDescriptor() *varlogpb.S return snd } +// GetLogStream retrieves a LogStreamReplicaMetadataDescriptor by its +// LogStreamID. It returns the LogStreamReplicaMetadataDescriptor and true if +// found, otherwise an empty descriptor and false. func (snmd *StorageNodeMetadataDescriptor) GetLogStream(logStreamID types.LogStreamID) (LogStreamReplicaMetadataDescriptor, bool) { logStreams := snmd.GetLogStreamReplicas() for i := range logStreams { @@ -29,6 +35,10 @@ func (snmd *StorageNodeMetadataDescriptor) GetLogStream(logStreamID types.LogStr return LogStreamReplicaMetadataDescriptor{}, false } +// Head returns the varlogpb.LogEntryMeta corresponding to the local low +// watermark of the LogStreamReplicaMetadataDescriptor. The "head" represents +// the earliest log entry in the log stream replica. It returns an empty +// varlogpb.LogEntryMeta if the LogStreamReplicaMetadataDescriptor is nil. func (lsrmd *LogStreamReplicaMetadataDescriptor) Head() varlogpb.LogEntryMeta { if lsrmd == nil { return varlogpb.LogEntryMeta{} @@ -41,6 +51,10 @@ func (lsrmd *LogStreamReplicaMetadataDescriptor) Head() varlogpb.LogEntryMeta { } } +// Tail returns the varlogpb.LogEntryMeta corresponding to the local high +// watermark of the LogStreamReplicaMetadataDescriptor. The "tail" represents +// the latest log entry in the log stream replica. It returns an empty +// varlogpb.LogEntryMeta if the LogStreamReplicaMetadataDescriptor is nil. func (lsrmd *LogStreamReplicaMetadataDescriptor) Tail() varlogpb.LogEntryMeta { if lsrmd == nil { return varlogpb.LogEntryMeta{} diff --git a/proto/snpb/metadata_test.go b/proto/snpb/metadata_test.go new file mode 100644 index 000000000..293af33d6 --- /dev/null +++ b/proto/snpb/metadata_test.go @@ -0,0 +1,191 @@ +package snpb_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/pkg/types" + "github.com/kakao/varlog/proto/snpb" + "github.com/kakao/varlog/proto/varlogpb" +) + +func TestStorageNodeMetadataDescriptor_ToStorageNodeDescriptor(t *testing.T) { + tcs := []struct { + snmd *snpb.StorageNodeMetadataDescriptor + want *varlogpb.StorageNodeDescriptor + name string + }{ + { + name: "Nil", + snmd: nil, + want: nil, + }, + { + name: "NonNil", + snmd: &snpb.StorageNodeMetadataDescriptor{ + StorageNode: varlogpb.StorageNode{ + StorageNodeID: types.MinStorageNodeID, + Address: "node1", + }, + Storages: []varlogpb.StorageDescriptor{ + {Path: "/path1"}, + {Path: "/path2"}, + }, + }, + want: &varlogpb.StorageNodeDescriptor{ + StorageNode: varlogpb.StorageNode{ + StorageNodeID: types.MinStorageNodeID, + Address: "node1", + }, + Paths: []string{"/path1", "/path2"}, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.snmd.ToStorageNodeDescriptor() + require.Equal(t, tc.want, got) + }) + } +} + +func TestStorageNodeMetadataDescriptor_GetLogStream(t *testing.T) { + tcs := []struct { + name string + logStreamID types.LogStreamID + want snpb.LogStreamReplicaMetadataDescriptor + wantFound bool + }{ + { + name: "Found", + logStreamID: types.LogStreamID(1), + want: snpb.LogStreamReplicaMetadataDescriptor{ + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + LogStreamID: types.LogStreamID(1), + }, + }, + }, + wantFound: true, + }, + { + name: "NotFound", + logStreamID: types.LogStreamID(3), + want: snpb.LogStreamReplicaMetadataDescriptor{}, + wantFound: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + snmd := &snpb.StorageNodeMetadataDescriptor{ + LogStreamReplicas: []snpb.LogStreamReplicaMetadataDescriptor{ + { + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + LogStreamID: types.LogStreamID(1), + }, + }, + }, + { + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + LogStreamID: types.LogStreamID(2), + }, + }, + }, + }, + } + + got, found := snmd.GetLogStream(tc.logStreamID) + require.Equal(t, tc.wantFound, found) + require.Equal(t, tc.want, got) + }) + } +} + +func TestLogStreamReplicaMetadataDescriptor_Head(t *testing.T) { + tcs := []struct { + lsrmd *snpb.LogStreamReplicaMetadataDescriptor + name string + want varlogpb.LogEntryMeta + }{ + { + name: "Nil", + lsrmd: nil, + want: varlogpb.LogEntryMeta{}, + }, + { + name: "NonNil", + lsrmd: &snpb.LogStreamReplicaMetadataDescriptor{ + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(2), + }, + }, + LocalLowWatermark: varlogpb.LogSequenceNumber{ + LLSN: types.LLSN(3), + GLSN: types.GLSN(4), + }, + }, + want: varlogpb.LogEntryMeta{ + TopicID: 1, + LogStreamID: 2, + LLSN: 3, + GLSN: 4, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.lsrmd.Head() + require.Equal(t, tc.want, got) + }) + } +} + +func TestLogStreamReplicaMetadataDescriptor_Tail(t *testing.T) { + tcs := []struct { + lsrmd *snpb.LogStreamReplicaMetadataDescriptor + name string + want varlogpb.LogEntryMeta + }{ + { + name: "Nil", + lsrmd: nil, + want: varlogpb.LogEntryMeta{}, + }, + { + name: "NonNil", + lsrmd: &snpb.LogStreamReplicaMetadataDescriptor{ + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(2), + }, + }, + LocalHighWatermark: varlogpb.LogSequenceNumber{ + LLSN: types.LLSN(5), + GLSN: types.GLSN(6), + }, + }, + want: varlogpb.LogEntryMeta{ + TopicID: 1, + LogStreamID: 2, + LLSN: 5, + GLSN: 6, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.lsrmd.Tail() + require.Equal(t, tc.want, got) + }) + } +}