diff --git a/cmd/varlogadm/cli.go b/cmd/varlogadm/cli.go index 7922ec60a..accadfdc5 100644 --- a/cmd/varlogadm/cli.go +++ b/cmd/varlogadm/cli.go @@ -36,7 +36,7 @@ func newStartCommand() *cli.Command { Flags: []cli.Flag{ flagClusterID.StringFlag(false, types.ClusterID(1).String()), flagListen.StringFlag(false, admin.DefaultListenAddress), - flagReplicationFactor.UintFlag(false, admin.DefaultReplicationFactor), + flagReplicationFactor, flagLogStreamGCTimeout.DurationFlag(false, admin.DefaultLogStreamGCTimeout), flagDisableAutoLogStreamSync.BoolFlag(), flagAutoUnseal.BoolFlag(), @@ -130,8 +130,8 @@ func start(c *cli.Context) error { return err } - repfactor := c.Uint(flagReplicationFactor.Name) - repsel, err := admin.NewReplicaSelector(c.String(flagReplicaSelector.Name), mrMgr.ClusterMetadataView(), int(repfactor)) + repfactor := c.Int(flagReplicationFactor.Name) + repsel, err := admin.NewReplicaSelector(c.String(flagReplicaSelector.Name), mrMgr.ClusterMetadataView(), repfactor) if err != nil { return err } diff --git a/cmd/varlogadm/flags.go b/cmd/varlogadm/flags.go index 029426d27..d583b8552 100644 --- a/cmd/varlogadm/flags.go +++ b/cmd/varlogadm/flags.go @@ -17,11 +17,7 @@ var ( Aliases: []string{"listen-address", "rpc-bind-address"}, Envs: []string{"LISTEN", "LISTEN_ADDRESS", "RPC_BIND_ADDRESS"}, } - flagReplicationFactor = flags.FlagDesc{ - Name: "replication-factor", - Usage: "replication factor", - Envs: []string{"REPLICATION_FACTOR"}, - } + flagReplicationFactor = flags.ReplicationFactor flagLogStreamGCTimeout = flags.FlagDesc{ Name: "log-stream-gc-timeout", Envs: []string{"LOG_STREAM_GC_TIMEOUT"}, diff --git a/cmd/varlogmr/flags.go b/cmd/varlogmr/flags.go index a70766594..efa2879a2 100644 --- a/cmd/varlogmr/flags.go +++ b/cmd/varlogmr/flags.go @@ -36,12 +36,7 @@ var ( Envs: []string{"DEBUG_ADDRESS"}, } - flagReplicationFactor = flags.FlagDesc{ - Name: "log-rep-factor", - Aliases: []string{"replication-factor"}, - Usage: "Replication factor or log stream", - Envs: []string{"LOG_REP_FACTOR"}, - } + flagReplicationFactor = flags.ReplicationFactor flagRaftProposeTimeout = flags.FlagDesc{ Name: "raft-propose-timeout", diff --git a/cmd/varlogmr/metadata_repository.go b/cmd/varlogmr/metadata_repository.go index c050841f3..1690888c4 100644 --- a/cmd/varlogmr/metadata_repository.go +++ b/cmd/varlogmr/metadata_repository.go @@ -125,7 +125,7 @@ func initCLI() *cli.App { flagRPCAddr.StringFlag(false, metarepos.DefaultRPCBindAddress), flagRaftAddr.StringFlag(false, metarepos.DefaultRaftAddress), flagDebugAddr.StringFlag(false, metarepos.DefaultDebugAddress), - flagReplicationFactor.IntFlag(false, metarepos.DefaultLogReplicationFactor), + flagReplicationFactor, flagRaftProposeTimeout.DurationFlag(false, metarepos.DefaultProposeTimeout), flagRPCTimeout.DurationFlag(false, metarepos.DefaultRPCTimeout), flagCommitTick.DurationFlag(false, metarepos.DefaultCommitTick), diff --git a/internal/admin/admin.go b/internal/admin/admin.go index b0037021b..17e563c69 100644 --- a/internal/admin/admin.go +++ b/internal/admin/admin.go @@ -137,7 +137,7 @@ func (adm *Admin) Serve() error { if ce := adm.logger.Check(zap.InfoLevel, "starting"); ce != nil { ce.Write( zap.String("address", adm.serverAddr), - zap.Uint("replicationFactor", adm.replicationFactor), + zap.Int("replicationFactor", adm.replicationFactor), zap.String("replicationSelector", adm.snSelector.Name()), zap.Duration("logStreamGCTimeout", adm.logStreamGCTimeout), ) @@ -674,7 +674,7 @@ func (adm *Admin) addLogStreamInternal(ctx context.Context, tpid types.TopicID, func (adm *Admin) verifyLogStream(clusmeta *varlogpb.MetadataDescriptor, lsdesc *varlogpb.LogStreamDescriptor) error { replicas := lsdesc.GetReplicas() // the number of logstream replica - if uint(len(replicas)) != adm.replicationFactor { + if len(replicas) != adm.replicationFactor { return status.Errorf(codes.FailedPrecondition, "add log stream: invalid number of log stream replicas: expected %d, actual %d", adm.replicationFactor, len(replicas)) } // storagenode existence diff --git a/internal/admin/config.go b/internal/admin/config.go index f74b9da99..c6bfdcd16 100644 --- a/internal/admin/config.go +++ b/internal/admin/config.go @@ -11,20 +11,21 @@ import ( "github.com/kakao/varlog/internal/admin/snmanager" "github.com/kakao/varlog/internal/admin/snwatcher" "github.com/kakao/varlog/internal/admin/stats" + "github.com/kakao/varlog/internal/flags" "github.com/kakao/varlog/pkg/types" ) const ( DefaultClusterID = types.ClusterID(1) DefaultListenAddress = "127.0.0.1:9090" - DefaultReplicationFactor = 1 + DefaultReplicationFactor = flags.DefaultReplicationFactor DefaultLogStreamGCTimeout = 24 * time.Hour ) type config struct { cid types.ClusterID listenAddress string - replicationFactor uint + replicationFactor int logStreamGCTimeout time.Duration disableAutoLogStreamSync bool enableAutoUnseal bool @@ -123,7 +124,7 @@ func WithListenAddress(listen string) Option { }) } -func WithReplicationFactor(replicationFactor uint) Option { +func WithReplicationFactor(replicationFactor int) Option { return newFuncOption(func(cfg *config) { cfg.replicationFactor = replicationFactor }) diff --git a/internal/flags/cluster.go b/internal/flags/cluster.go new file mode 100644 index 000000000..2f0e1ce17 --- /dev/null +++ b/internal/flags/cluster.go @@ -0,0 +1,28 @@ +package flags + +import ( + "fmt" + + "github.com/urfave/cli/v2" +) + +const ( + CategoryCluster = "Cluster:" + + DefaultReplicationFactor = 1 +) + +var ( + ReplicationFactor = &cli.IntFlag{ + Name: "replication-factor", + Category: CategoryCluster, + EnvVars: []string{"REPLICATION_FACTOR"}, + Value: DefaultReplicationFactor, + Action: func(_ *cli.Context, value int) error { + if value <= 0 { + return fmt.Errorf("invalid value \"%d\" for flag --replication-factor", value) + } + return nil + }, + } +) diff --git a/internal/flags/flags_test.go b/internal/flags/flags_test.go new file mode 100644 index 000000000..7ca1e0314 --- /dev/null +++ b/internal/flags/flags_test.go @@ -0,0 +1,52 @@ +package flags + +import ( + "io" + "testing" + + "github.com/stretchr/testify/require" + "github.com/urfave/cli/v2" +) + +func TestReplicationFactor(t *testing.T) { + tcs := []struct { + name string + args []string + ok bool + }{ + { + name: "replication_factor=1", + args: []string{"test", "--replication-factor=1"}, + ok: true, + }, + { + name: "replication_factor=0", + args: []string{"test", "--replication-factor=0"}, + ok: false, + }, + { + name: "replication_factor=-1", + args: []string{"test", "--replication-factor=-1"}, + ok: false, + }, + } + + for _, tc := range tcs { + tc := tc + t.Run(tc.name, func(t *testing.T) { + app := &cli.App{ + Name: "test", + Flags: []cli.Flag{ + ReplicationFactor, + }, + Writer: io.Discard, + } + err := app.Run(tc.args) + if !tc.ok { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} diff --git a/internal/metarepos/config.go b/internal/metarepos/config.go index 767bbdbb8..cb82f04a4 100644 --- a/internal/metarepos/config.go +++ b/internal/metarepos/config.go @@ -10,6 +10,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "github.com/kakao/varlog/internal/flags" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/netutil" ) @@ -23,7 +24,7 @@ const ( DefaultSnapshotCatchUpCount uint64 = 10000 DefaultSnapshotPurgeCount uint = 10 DefaultWalPurgeCount uint = 10 - DefaultLogReplicationFactor int = 1 + DefaultLogReplicationFactor int = flags.DefaultReplicationFactor DefaultProposeTimeout = 100 * time.Millisecond DefaultRaftTick = 100 * time.Millisecond DefaultRPCTimeout = 100 * time.Millisecond diff --git a/tests/ee/cluster/local/metarepos/node.go b/tests/ee/cluster/local/metarepos/node.go index 0726d4168..7db1f2eb7 100644 --- a/tests/ee/cluster/local/metarepos/node.go +++ b/tests/ee/cluster/local/metarepos/node.go @@ -96,7 +96,7 @@ func (node *Node) arguments(t *testing.T) []string { "--cluster-id", node.cid.String(), "--raft-address", node.raftURL, "--bind", node.rpcAddr, - "--log-rep-factor", strconv.Itoa(node.replicationFactor), + "--replication-factor", strconv.Itoa(node.replicationFactor), "--raft-dir", node.raftDir, "--log-dir", node.logDir, } diff --git a/tests/it/testenv.go b/tests/it/testenv.go index bc8754fa6..5c83005ed 100644 --- a/tests/it/testenv.go +++ b/tests/it/testenv.go @@ -1200,7 +1200,7 @@ func (clus *VarlogCluster) initVMS(t *testing.T) { opts := append(clus.VMSOpts, admin.WithListenAddress(listenAddress), admin.WithClusterID(clus.clusterID), - admin.WithReplicationFactor(uint(clus.nrRep)), + admin.WithReplicationFactor(clus.nrRep), admin.WithLogger(clus.logger.Named("admin")), admin.WithMetadataRepositoryManager(mrMgr), admin.WithStorageNodeManager(snMgr),