From c033d621660acb0344d0378a913d40ccdac7970f Mon Sep 17 00:00:00 2001 From: Injun Song Date: Mon, 11 Sep 2023 20:05:44 +0900 Subject: [PATCH] feat(varlogtest): add initial metadata repository and support fetch APIs This PR adds an option to set initial metadata repository nodes and supports fetch APIs for the metadata repository nodes in the varlogtest admin. Although it doesn't provide mutation APIs for the metadata repository yet, it will be helpful to test the varlog. --- pkg/varlogtest/admin.go | 44 ++++++++++++- pkg/varlogtest/config.go | 33 +++++++++- pkg/varlogtest/varlogtest.go | 14 ++++ pkg/varlogtest/varlogtest_test.go | 104 ++++++++++++++++++++++++++++++ 4 files changed, 191 insertions(+), 4 deletions(-) diff --git a/pkg/varlogtest/admin.go b/pkg/varlogtest/admin.go index a13660aa0..4fc39a0e1 100644 --- a/pkg/varlogtest/admin.go +++ b/pkg/varlogtest/admin.go @@ -1,8 +1,11 @@ package varlogtest import ( + "cmp" "context" + "fmt" "path/filepath" + "slices" "time" "github.com/gogo/protobuf/proto" @@ -466,15 +469,50 @@ func (c *testAdmin) Trim(ctx context.Context, topicID types.TopicID, lastGLSN ty } func (c *testAdmin) GetMetadataRepositoryNode(ctx context.Context, nid types.NodeID, opts ...varlog.AdminCallOption) (*varlogpb.MetadataRepositoryNode, error) { - panic("not implemented") + if err := c.lock(); err != nil { + return nil, err + } + defer c.unlock() + + mrn, ok := c.vt.mrns[nid] + if !ok { + return nil, fmt.Errorf("metadata repository node: %w", verrors.ErrNotExist) + } + return &mrn, nil } func (c *testAdmin) ListMetadataRepositoryNodes(ctx context.Context, opts ...varlog.AdminCallOption) ([]varlogpb.MetadataRepositoryNode, error) { - panic("not implemented") + if err := c.lock(); err != nil { + return nil, err + } + defer c.unlock() + + ret := make([]varlogpb.MetadataRepositoryNode, 0, len(c.vt.mrns)) + for _, mrn := range c.vt.mrns { + ret = append(ret, mrn) + } + slices.SortFunc(ret, func(a, b varlogpb.MetadataRepositoryNode) int { + return cmp.Compare(a.NodeID, b.NodeID) + }) + return ret, nil } func (c *testAdmin) GetMRMembers(ctx context.Context, opts ...varlog.AdminCallOption) (*admpb.GetMRMembersResponse, error) { - panic("not implemented") + if err := c.lock(); err != nil { + return nil, err + } + defer c.unlock() + + rsp := &admpb.GetMRMembersResponse{ + Leader: c.vt.leaderMR, + ReplicationFactor: int32(c.vt.replicationFactor), + Members: make(map[types.NodeID]string, len(c.vt.mrns)), + } + for _, mrn := range c.vt.mrns { + rsp.Members[mrn.NodeID] = mrn.RaftURL + } + + return rsp, nil } func (c *testAdmin) AddMetadataRepositoryNode(ctx context.Context, raftURL, rpcAddr string, opts ...varlog.AdminCallOption) (*varlogpb.MetadataRepositoryNode, error) { diff --git a/pkg/varlogtest/config.go b/pkg/varlogtest/config.go index 32a3b3d17..da00db5b0 100644 --- a/pkg/varlogtest/config.go +++ b/pkg/varlogtest/config.go @@ -4,15 +4,35 @@ import ( "fmt" "github.com/kakao/varlog/pkg/types" + "github.com/kakao/varlog/proto/varlogpb" ) type config struct { clusterID types.ClusterID replicationFactor int + initialMRNodes []varlogpb.MetadataRepositoryNode } func newConfig(opts []Option) (config, error) { - cfg := config{} + cfg := config{ + initialMRNodes: []varlogpb.MetadataRepositoryNode{ + { + NodeID: types.NewNodeIDFromURL("http://127.0.10.1:9091"), + RaftURL: "http://127.0.10.1:9091", + RPCAddr: "127.0.10.1:9092", + }, + { + NodeID: types.NewNodeIDFromURL("http://127.0.10.2:9091"), + RaftURL: "http://127.0.10.2:9091", + RPCAddr: "127.0.10.2:9092", + }, + { + NodeID: types.NewNodeIDFromURL("http://127.0.10.3:9091"), + RaftURL: "http://127.0.10.3:9091", + RPCAddr: "127.0.10.3:9092", + }, + }, + } for _, opt := range opts { opt.apply(&cfg) } @@ -29,6 +49,11 @@ func (cfg *config) validate() error { if cfg.replicationFactor < 1 { return fmt.Errorf("invalid replication factor %d", cfg.replicationFactor) } + for _, mrn := range cfg.initialMRNodes { + if mrn.NodeID == types.InvalidNodeID { + return fmt.Errorf("invalid metadata repository node id %d", mrn.NodeID) + } + } return nil } @@ -59,3 +84,9 @@ func WithReplicationFactor(repfactor int) Option { cfg.replicationFactor = repfactor }) } + +func WithInitialMetadataRepositoryNodes(mrns ...varlogpb.MetadataRepositoryNode) Option { + return newFuncOption(func(cfg *config) { + cfg.initialMRNodes = mrns + }) +} diff --git a/pkg/varlogtest/varlogtest.go b/pkg/varlogtest/varlogtest.go index f9be967eb..d0688b80a 100644 --- a/pkg/varlogtest/varlogtest.go +++ b/pkg/varlogtest/varlogtest.go @@ -32,6 +32,9 @@ type VarlogTest struct { version types.Version trimGLSNs map[types.TopicID]types.GLSN + mrns map[types.NodeID]varlogpb.MetadataRepositoryNode + leaderMR types.NodeID + nextTopicID types.TopicID nextStorageNodeID types.StorageNodeID nextLogStreamID types.LogStreamID @@ -55,10 +58,21 @@ func New(opts ...Option) (*VarlogTest, error) { globalLogEntries: make(map[types.TopicID][]*varlogpb.LogEntry), localLogEntries: make(map[types.LogStreamID][]*varlogpb.LogEntry), trimGLSNs: make(map[types.TopicID]types.GLSN), + mrns: make(map[types.NodeID]varlogpb.MetadataRepositoryNode, len(cfg.initialMRNodes)), + leaderMR: types.InvalidNodeID, } vt.cond = sync.NewCond(&vt.mu) vt.admin = &testAdmin{vt: vt} vt.vlg = &testLog{vt: vt} + + for _, mrn := range vt.initialMRNodes { + if vt.leaderMR == types.InvalidNodeID { + vt.leaderMR = mrn.NodeID + mrn.Leader = true + } + vt.mrns[mrn.NodeID] = mrn + } + return vt, nil } diff --git a/pkg/varlogtest/varlogtest_test.go b/pkg/varlogtest/varlogtest_test.go index 52f6d58d0..dd951a0ef 100644 --- a/pkg/varlogtest/varlogtest_test.go +++ b/pkg/varlogtest/varlogtest_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "math/rand" + "slices" "sync" "testing" "time" @@ -684,6 +685,109 @@ func TestVarlogTest_Trim(t *testing.T) { assert.NoError(t, subscriber.Close()) } +func TestVarlogTestAdminMetadataRepository(t *testing.T) { + const ( + cid = types.ClusterID(1) + repfactor = 3 + ) + + ctx := context.Background() + initialMRNs := []varlogpb.MetadataRepositoryNode{ + { + NodeID: types.NewNodeIDFromURL("http://127.0.20.1:9091"), + RaftURL: "http://127.0.20.1:9091", + RPCAddr: "127.0.20.1:9092", + }, + { + NodeID: types.NewNodeIDFromURL("http://127.0.20.2:9091"), + RaftURL: "http://127.0.20.2:9091", + RPCAddr: "127.0.20.2:9092", + }, + { + NodeID: types.NewNodeIDFromURL("http://127.0.20.3:9091"), + RaftURL: "http://127.0.20.3:9091", + RPCAddr: "127.0.20.3:9092", + }, + } + + tcs := []struct { + name string + initialMRNs []varlogpb.MetadataRepositoryNode + testf func(t *testing.T, vadm varlog.Admin) + }{ + { + name: "NoInitialMRs", + initialMRNs: []varlogpb.MetadataRepositoryNode{}, + testf: func(t *testing.T, vadm varlog.Admin) { + mrns, err := vadm.ListMetadataRepositoryNodes(ctx) + require.NoError(t, err) + require.Empty(t, mrns) + }, + }, + { + name: "ListMetadataRepositoryNodes", + initialMRNs: initialMRNs, + testf: func(t *testing.T, vadm varlog.Admin) { + mrns, err := vadm.ListMetadataRepositoryNodes(ctx) + require.NoError(t, err) + require.Len(t, mrns, 3) + + for _, initMRN := range initialMRNs { + require.True(t, slices.ContainsFunc(mrns, func(mrn varlogpb.MetadataRepositoryNode) bool { + return mrn.NodeID == initMRN.NodeID + })) + } + + numLeaders := 0 + for _, mrn := range mrns { + if mrn.Leader { + numLeaders++ + } + } + require.Equal(t, 1, numLeaders) + }, + }, + { + name: "GetMetadataRepositoryNode", + initialMRNs: initialMRNs, + testf: func(t *testing.T, vadm varlog.Admin) { + for _, initialMRN := range initialMRNs { + _, err := vadm.GetMetadataRepositoryNode(ctx, initialMRN.NodeID) + require.NoError(t, err) + } + }, + }, + { + name: "GetMetadataRepositoryNode", + initialMRNs: initialMRNs, + testf: func(t *testing.T, vadm varlog.Admin) { + rsp, err := vadm.GetMRMembers(ctx) + require.NoError(t, err) + + require.NotEqual(t, types.InvalidNodeID, rsp.Leader) + require.EqualValues(t, repfactor, rsp.ReplicationFactor) + require.Len(t, rsp.Members, len(initialMRNs)) + for _, initialMRN := range initialMRNs { + require.Contains(t, rsp.Members, initialMRN.NodeID) + } + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + vt, err := varlogtest.New( + varlogtest.WithClusterID(cid), + varlogtest.WithReplicationFactor(repfactor), + varlogtest.WithInitialMetadataRepositoryNodes(tc.initialMRNs...), + ) + require.NoError(t, err) + + tc.testf(t, vt.Admin()) + }) + } +} + func TestMain(m *testing.M) { goleak.VerifyTestMain(m) }