Skip to content

Commit

Permalink
feat(varlogtest): add initial metadata repository and support fetch APIs
Browse files Browse the repository at this point in the history
This PR adds an option to set initial metadata repository nodes and supports fetch APIs for the
metadata repository nodes in the varlogtest admin. Although it doesn't provide mutation APIs for the
metadata repository yet, it will be helpful to test the varlog.
  • Loading branch information
ijsong committed Sep 11, 2023
1 parent 3df49ac commit 9aae719
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 4 deletions.
44 changes: 41 additions & 3 deletions pkg/varlogtest/admin.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package varlogtest

import (
"cmp"
"context"
"fmt"
"path/filepath"
"slices"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -466,15 +469,50 @@ func (c *testAdmin) Trim(ctx context.Context, topicID types.TopicID, lastGLSN ty
}

func (c *testAdmin) GetMetadataRepositoryNode(ctx context.Context, nid types.NodeID, opts ...varlog.AdminCallOption) (*varlogpb.MetadataRepositoryNode, error) {
panic("not implemented")
if err := c.lock(); err != nil {
return nil, err
}
defer c.unlock()

mrn, ok := c.vt.mrns[nid]
if !ok {
return nil, fmt.Errorf("metadata repository node: %w", verrors.ErrNotExist)
}
return &mrn, nil
}

func (c *testAdmin) ListMetadataRepositoryNodes(ctx context.Context, opts ...varlog.AdminCallOption) ([]varlogpb.MetadataRepositoryNode, error) {
panic("not implemented")
if err := c.lock(); err != nil {
return nil, err
}
defer c.unlock()

ret := make([]varlogpb.MetadataRepositoryNode, 0, len(c.vt.mrns))
for _, mrn := range c.vt.mrns {
ret = append(ret, mrn)
}
slices.SortFunc(ret, func(a, b varlogpb.MetadataRepositoryNode) int {
return cmp.Compare(a.NodeID, b.NodeID)
})
return ret, nil
}

func (c *testAdmin) GetMRMembers(ctx context.Context, opts ...varlog.AdminCallOption) (*admpb.GetMRMembersResponse, error) {
panic("not implemented")
if err := c.lock(); err != nil {
return nil, err
}
defer c.unlock()

rsp := &admpb.GetMRMembersResponse{
Leader: c.vt.leaderMR,
ReplicationFactor: int32(c.vt.replicationFactor),
Members: make(map[types.NodeID]string, len(c.vt.mrns)),
}
for _, mrn := range c.vt.mrns {
rsp.Members[mrn.NodeID] = mrn.RaftURL
}

return rsp, nil
}

func (c *testAdmin) AddMetadataRepositoryNode(ctx context.Context, raftURL, rpcAddr string, opts ...varlog.AdminCallOption) (*varlogpb.MetadataRepositoryNode, error) {
Expand Down
33 changes: 32 additions & 1 deletion pkg/varlogtest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,35 @@ import (
"fmt"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

type config struct {
clusterID types.ClusterID
replicationFactor int
initialMRNodes []varlogpb.MetadataRepositoryNode
}

func newConfig(opts []Option) (config, error) {
cfg := config{}
cfg := config{
initialMRNodes: []varlogpb.MetadataRepositoryNode{
{
NodeID: types.NewNodeIDFromURL("http://127.0.10.1:9091"),
RaftURL: "http://127.0.10.1:9091",
RPCAddr: "127.0.10.1:9092",
},
{
NodeID: types.NewNodeIDFromURL("http://127.0.10.2:9091"),
RaftURL: "http://127.0.10.2:9091",
RPCAddr: "127.0.10.2:9092",
},
{
NodeID: types.NewNodeIDFromURL("http://127.0.10.3:9091"),
RaftURL: "http://127.0.10.3:9091",
RPCAddr: "127.0.10.3:9092",
},
},
}
for _, opt := range opts {
opt.apply(&cfg)
}
Expand All @@ -29,6 +49,11 @@ func (cfg *config) validate() error {
if cfg.replicationFactor < 1 {
return fmt.Errorf("invalid replication factor %d", cfg.replicationFactor)
}
for _, mrn := range cfg.initialMRNodes {
if mrn.NodeID == types.InvalidNodeID {
return fmt.Errorf("invalid metadata repository node id %d", mrn.NodeID)
}
}
return nil
}

Expand Down Expand Up @@ -59,3 +84,9 @@ func WithReplicationFactor(repfactor int) Option {
cfg.replicationFactor = repfactor
})
}

func WithInitialMetadataRepositoryNodes(mrns ...varlogpb.MetadataRepositoryNode) Option {
return newFuncOption(func(cfg *config) {
cfg.initialMRNodes = mrns
})
}
14 changes: 14 additions & 0 deletions pkg/varlogtest/varlogtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type VarlogTest struct {
version types.Version
trimGLSNs map[types.TopicID]types.GLSN

mrns map[types.NodeID]varlogpb.MetadataRepositoryNode
leaderMR types.NodeID

nextTopicID types.TopicID
nextStorageNodeID types.StorageNodeID
nextLogStreamID types.LogStreamID
Expand All @@ -55,10 +58,21 @@ func New(opts ...Option) (*VarlogTest, error) {
globalLogEntries: make(map[types.TopicID][]*varlogpb.LogEntry),
localLogEntries: make(map[types.LogStreamID][]*varlogpb.LogEntry),
trimGLSNs: make(map[types.TopicID]types.GLSN),
mrns: make(map[types.NodeID]varlogpb.MetadataRepositoryNode, len(cfg.initialMRNodes)),
leaderMR: types.InvalidNodeID,
}
vt.cond = sync.NewCond(&vt.mu)
vt.admin = &testAdmin{vt: vt}
vt.vlg = &testLog{vt: vt}

for _, mrn := range vt.initialMRNodes {
if vt.leaderMR == types.InvalidNodeID {
vt.leaderMR = mrn.NodeID
mrn.Leader = true
}
vt.mrns[mrn.NodeID] = mrn
}

return vt, nil
}

Expand Down
104 changes: 104 additions & 0 deletions pkg/varlogtest/varlogtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"math/rand"
"slices"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -684,6 +685,109 @@ func TestVarlogTest_Trim(t *testing.T) {
assert.NoError(t, subscriber.Close())
}

func TestVarlogTestAdminMetadataRepository(t *testing.T) {
const (
cid = types.ClusterID(1)
repfactor = 3
)

ctx := context.Background()
initialMRNs := []varlogpb.MetadataRepositoryNode{
{
NodeID: types.NewNodeIDFromURL("http://127.0.20.1:9091"),
RaftURL: "http://127.0.20.1:9091",
RPCAddr: "127.0.20.1:9092",
},
{
NodeID: types.NewNodeIDFromURL("http://127.0.20.2:9091"),
RaftURL: "http://127.0.20.2:9091",
RPCAddr: "127.0.20.2:9092",
},
{
NodeID: types.NewNodeIDFromURL("http://127.0.20.3:9091"),
RaftURL: "http://127.0.20.3:9091",
RPCAddr: "127.0.20.3:9092",
},
}

tcs := []struct {
name string
initialMRNs []varlogpb.MetadataRepositoryNode
testf func(t *testing.T, vadm varlog.Admin)
}{
{
name: "NoInitialMRs",
initialMRNs: []varlogpb.MetadataRepositoryNode{},
testf: func(t *testing.T, vadm varlog.Admin) {
mrns, err := vadm.ListMetadataRepositoryNodes(ctx)
require.NoError(t, err)
require.Empty(t, mrns)
},
},
{
name: "ListMetadataRepositoryNodes",
initialMRNs: initialMRNs,
testf: func(t *testing.T, vadm varlog.Admin) {
mrns, err := vadm.ListMetadataRepositoryNodes(ctx)
require.NoError(t, err)
require.Len(t, mrns, 3)

for _, initMRN := range initialMRNs {
require.True(t, slices.ContainsFunc(mrns, func(mrn varlogpb.MetadataRepositoryNode) bool {
return mrn.NodeID == initMRN.NodeID
}))
}

numLeaders := 0
for _, mrn := range mrns {
if mrn.Leader {
numLeaders++
}
}
require.Equal(t, 1, numLeaders)
},
},
{
name: "GetMetadataRepositoryNode",
initialMRNs: initialMRNs,
testf: func(t *testing.T, vadm varlog.Admin) {
for _, initialMRN := range initialMRNs {
_, err := vadm.GetMetadataRepositoryNode(ctx, initialMRN.NodeID)
require.NoError(t, err)
}
},
},
{
name: "GetMetadataRepositoryNode",
initialMRNs: initialMRNs,
testf: func(t *testing.T, vadm varlog.Admin) {
rsp, err := vadm.GetMRMembers(ctx)
require.NoError(t, err)

require.NotEqual(t, types.InvalidNodeID, rsp.Leader)
require.EqualValues(t, repfactor, rsp.ReplicationFactor)
require.Len(t, rsp.Members, len(initialMRNs))
for _, initialMRN := range initialMRNs {
require.Contains(t, rsp.Members, initialMRN.NodeID)
}
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
vt, err := varlogtest.New(
varlogtest.WithClusterID(cid),
varlogtest.WithReplicationFactor(repfactor),
varlogtest.WithInitialMetadataRepositoryNodes(tc.initialMRNs...),
)
require.NoError(t, err)

tc.testf(t, vt.Admin())
})
}
}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

0 comments on commit 9aae719

Please sign in to comment.