Skip to content

Commit

Permalink
feat(admin): check replication factor in mrmanager
Browse files Browse the repository at this point in the history
This PR adds codes to check the replication factor in mrmanager. It compares the replication factors
configured in the admin and metadata repository.
  • Loading branch information
ijsong committed Sep 11, 2023
1 parent d01fafa commit cfe10ea
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 2 deletions.
4 changes: 3 additions & 1 deletion cmd/varlogadm/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,16 @@ func start(c *cli.Context) error {
_ = stop(ctx)
}()

repfactor := c.Int(flagReplicationFactor.Name)

mrMgr, err := mrmanager.New(context.TODO(),
mrmanager.WithAddresses(c.StringSlice(flagMetadataRepository.Name)...),
mrmanager.WithInitialMRConnRetryCount(c.Int(flagInitMRConnRetryCount.Name)),
mrmanager.WithInitialMRConnRetryBackoff(c.Duration(flagInitMRConnRetryBackoff.Name)),
mrmanager.WithMRManagerConnTimeout(c.Duration(flagMRConnTimeout.Name)),
mrmanager.WithMRManagerCallTimeout(c.Duration(flagMRCallTimeout.Name)),
mrmanager.WithClusterID(clusterID),
mrmanager.WithReplicationFactor(repfactor),
mrmanager.WithLogger(logger),
)
if err != nil {
Expand All @@ -130,7 +133,6 @@ func start(c *cli.Context) error {
return err
}

repfactor := c.Int(flagReplicationFactor.Name)
repsel, err := admin.NewReplicaSelector(c.String(flagReplicaSelector.Name), mrMgr.ClusterMetadataView(), repfactor)
if err != nil {
return err
Expand Down
13 changes: 13 additions & 0 deletions internal/admin/mrmanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package mrmanager

import (
"errors"
"fmt"
"time"

"go.uber.org/zap"

"github.com/kakao/varlog/internal/flags"
"github.com/kakao/varlog/pkg/types"
)

Expand All @@ -18,6 +20,7 @@ const (

type config struct {
cid types.ClusterID
repfactor int
metadataRepositoryAddresses []string
initialMRConnRetryCount int
initialMRConnRetryBackoff time.Duration
Expand All @@ -28,6 +31,7 @@ type config struct {

func newConfig(opts []Option) (config, error) {
cfg := config{
repfactor: flags.DefaultReplicationFactor,
initialMRConnRetryCount: DefaultInitialMRConnectRetryCount,
initialMRConnRetryBackoff: DefaultInitialMRConnectRetryBackoff,
connTimeout: DefaultMRConnTimeout,
Expand All @@ -45,6 +49,9 @@ func newConfig(opts []Option) (config, error) {
}

func (cfg *config) validate() error {
if cfg.repfactor < 1 {
return fmt.Errorf("invalid replication factor %d", cfg.repfactor)
}
if len(cfg.metadataRepositoryAddresses) == 0 {
return errors.New("no metadata repository address")
}
Expand Down Expand Up @@ -76,6 +83,12 @@ func WithClusterID(cid types.ClusterID) Option {
})
}

func WithReplicationFactor(repfactor int) Option {
return newFuncOption(func(cfg *config) {
cfg.repfactor = repfactor
})
}

func WithAddresses(addrs ...string) Option {
return newFuncOption(func(cfg *config) {
cfg.metadataRepositoryAddresses = addrs
Expand Down
5 changes: 4 additions & 1 deletion internal/admin/mrmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,10 @@ func (mrm *mrManager) getClusterMetadataSlowly(ctx context.Context) (*varlogpb.M
return nil, fmt.Errorf("cluster metadata: %w", err)
}
if info.ClusterID != mrm.cid {
return nil, fmt.Errorf("unexpected cluster id: expected %v, got %v", mrm.cid, info.ClusterID)
return nil, fmt.Errorf("unexpected cluster id: admin %v, metadata repository %v", mrm.cid, info.ClusterID)
}
if int(info.ReplicationFactor) != mrm.repfactor {
return nil, fmt.Errorf("unexpected replication factor: admin %v, metadata repository %v", mrm.repfactor, info.ReplicationFactor)
}

mrm.mu.Lock()
Expand Down
1 change: 1 addition & 0 deletions tests/it/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,7 @@ func (clus *VarlogCluster) initVMS(t *testing.T) {
mrmanager.WithAddresses(clus.mrRPCEndpoints...),
mrmanager.WithLogger(clus.logger.Logger),
mrmanager.WithClusterID(clus.clusterID),
mrmanager.WithReplicationFactor(clus.nrRep),
)
mrMgr, err := mrmanager.New(context.TODO(), mrMgrOpts...)
require.NoError(t, err)
Expand Down

0 comments on commit cfe10ea

Please sign in to comment.