From cfe10ea3e6cb040dead27eebf58ad0416823f7ab Mon Sep 17 00:00:00 2001 From: Injun Song Date: Mon, 31 Jul 2023 20:19:52 +0900 Subject: [PATCH] feat(admin): check replication factor in mrmanager This PR adds codes to check the replication factor in mrmanager. It compares the replication factors configured in the admin and metadata repository. --- cmd/varlogadm/cli.go | 4 +++- internal/admin/mrmanager/config.go | 13 +++++++++++++ internal/admin/mrmanager/manager.go | 5 ++++- tests/it/testenv.go | 1 + 4 files changed, 21 insertions(+), 2 deletions(-) diff --git a/cmd/varlogadm/cli.go b/cmd/varlogadm/cli.go index accadfdc5..566113827 100644 --- a/cmd/varlogadm/cli.go +++ b/cmd/varlogadm/cli.go @@ -108,6 +108,8 @@ func start(c *cli.Context) error { _ = stop(ctx) }() + repfactor := c.Int(flagReplicationFactor.Name) + mrMgr, err := mrmanager.New(context.TODO(), mrmanager.WithAddresses(c.StringSlice(flagMetadataRepository.Name)...), mrmanager.WithInitialMRConnRetryCount(c.Int(flagInitMRConnRetryCount.Name)), @@ -115,6 +117,7 @@ func start(c *cli.Context) error { mrmanager.WithMRManagerConnTimeout(c.Duration(flagMRConnTimeout.Name)), mrmanager.WithMRManagerCallTimeout(c.Duration(flagMRCallTimeout.Name)), mrmanager.WithClusterID(clusterID), + mrmanager.WithReplicationFactor(repfactor), mrmanager.WithLogger(logger), ) if err != nil { @@ -130,7 +133,6 @@ func start(c *cli.Context) error { return err } - repfactor := c.Int(flagReplicationFactor.Name) repsel, err := admin.NewReplicaSelector(c.String(flagReplicaSelector.Name), mrMgr.ClusterMetadataView(), repfactor) if err != nil { return err diff --git a/internal/admin/mrmanager/config.go b/internal/admin/mrmanager/config.go index fb1e1046b..ec15e1a43 100644 --- a/internal/admin/mrmanager/config.go +++ b/internal/admin/mrmanager/config.go @@ -2,10 +2,12 @@ package mrmanager import ( "errors" + "fmt" "time" "go.uber.org/zap" + "github.com/kakao/varlog/internal/flags" "github.com/kakao/varlog/pkg/types" ) @@ -18,6 +20,7 @@ const ( type config struct { cid types.ClusterID + repfactor int metadataRepositoryAddresses []string initialMRConnRetryCount int initialMRConnRetryBackoff time.Duration @@ -28,6 +31,7 @@ type config struct { func newConfig(opts []Option) (config, error) { cfg := config{ + repfactor: flags.DefaultReplicationFactor, initialMRConnRetryCount: DefaultInitialMRConnectRetryCount, initialMRConnRetryBackoff: DefaultInitialMRConnectRetryBackoff, connTimeout: DefaultMRConnTimeout, @@ -45,6 +49,9 @@ func newConfig(opts []Option) (config, error) { } func (cfg *config) validate() error { + if cfg.repfactor < 1 { + return fmt.Errorf("invalid replication factor %d", cfg.repfactor) + } if len(cfg.metadataRepositoryAddresses) == 0 { return errors.New("no metadata repository address") } @@ -76,6 +83,12 @@ func WithClusterID(cid types.ClusterID) Option { }) } +func WithReplicationFactor(repfactor int) Option { + return newFuncOption(func(cfg *config) { + cfg.repfactor = repfactor + }) +} + func WithAddresses(addrs ...string) Option { return newFuncOption(func(cfg *config) { cfg.metadataRepositoryAddresses = addrs diff --git a/internal/admin/mrmanager/manager.go b/internal/admin/mrmanager/manager.go index fb8e48613..426886bc9 100644 --- a/internal/admin/mrmanager/manager.go +++ b/internal/admin/mrmanager/manager.go @@ -400,7 +400,10 @@ func (mrm *mrManager) getClusterMetadataSlowly(ctx context.Context) (*varlogpb.M return nil, fmt.Errorf("cluster metadata: %w", err) } if info.ClusterID != mrm.cid { - return nil, fmt.Errorf("unexpected cluster id: expected %v, got %v", mrm.cid, info.ClusterID) + return nil, fmt.Errorf("unexpected cluster id: admin %v, metadata repository %v", mrm.cid, info.ClusterID) + } + if int(info.ReplicationFactor) != mrm.repfactor { + return nil, fmt.Errorf("unexpected replication factor: admin %v, metadata repository %v", mrm.repfactor, info.ReplicationFactor) } mrm.mu.Lock() diff --git a/tests/it/testenv.go b/tests/it/testenv.go index 393bea169..bd062f1f0 100644 --- a/tests/it/testenv.go +++ b/tests/it/testenv.go @@ -1186,6 +1186,7 @@ func (clus *VarlogCluster) initVMS(t *testing.T) { mrmanager.WithAddresses(clus.mrRPCEndpoints...), mrmanager.WithLogger(clus.logger.Logger), mrmanager.WithClusterID(clus.clusterID), + mrmanager.WithReplicationFactor(clus.nrRep), ) mrMgr, err := mrmanager.New(context.TODO(), mrMgrOpts...) require.NoError(t, err)