Skip to content

Commit

Permalink
refactor: add the standard flag --replication-factor
Browse files Browse the repository at this point in the history
Previously varlogadm and varlogmr defined the flag for replication factor, respectively, which made
strings for the flag different.
This change adds a new standard flag, `--replication-factor`. It also sets the type of replication
factor to an integer.
  • Loading branch information
ijsong committed Sep 25, 2023
1 parent 584d38c commit b032afa
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 23 deletions.
6 changes: 3 additions & 3 deletions cmd/varlogadm/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newStartCommand() *cli.Command {
Flags: []cli.Flag{
flagClusterID.StringFlag(false, types.ClusterID(1).String()),
flagListen.StringFlag(false, admin.DefaultListenAddress),
flagReplicationFactor.UintFlag(false, admin.DefaultReplicationFactor),
flagReplicationFactor,
flagLogStreamGCTimeout.DurationFlag(false, admin.DefaultLogStreamGCTimeout),
flagDisableAutoLogStreamSync.BoolFlag(),
flagAutoUnseal.BoolFlag(),
Expand Down Expand Up @@ -130,8 +130,8 @@ func start(c *cli.Context) error {
return err
}

repfactor := c.Uint(flagReplicationFactor.Name)
repsel, err := admin.NewReplicaSelector(c.String(flagReplicaSelector.Name), mrMgr.ClusterMetadataView(), int(repfactor))
repfactor := c.Int(flagReplicationFactor.Name)
repsel, err := admin.NewReplicaSelector(c.String(flagReplicaSelector.Name), mrMgr.ClusterMetadataView(), repfactor)
if err != nil {
return err
}
Expand Down
6 changes: 1 addition & 5 deletions cmd/varlogadm/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@ var (
Aliases: []string{"listen-address", "rpc-bind-address"},
Envs: []string{"LISTEN", "LISTEN_ADDRESS", "RPC_BIND_ADDRESS"},
}
flagReplicationFactor = flags.FlagDesc{
Name: "replication-factor",
Usage: "replication factor",
Envs: []string{"REPLICATION_FACTOR"},
}
flagReplicationFactor = flags.ReplicationFactor
flagLogStreamGCTimeout = flags.FlagDesc{
Name: "log-stream-gc-timeout",
Envs: []string{"LOG_STREAM_GC_TIMEOUT"},
Expand Down
7 changes: 1 addition & 6 deletions cmd/varlogmr/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,7 @@ var (
Envs: []string{"DEBUG_ADDRESS"},
}

flagReplicationFactor = flags.FlagDesc{
Name: "log-rep-factor",
Aliases: []string{"replication-factor"},
Usage: "Replication factor or log stream",
Envs: []string{"LOG_REP_FACTOR"},
}
flagReplicationFactor = flags.ReplicationFactor

flagRaftProposeTimeout = flags.FlagDesc{
Name: "raft-propose-timeout",
Expand Down
2 changes: 1 addition & 1 deletion cmd/varlogmr/metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func initCLI() *cli.App {
flagRPCAddr.StringFlag(false, metarepos.DefaultRPCBindAddress),
flagRaftAddr.StringFlag(false, metarepos.DefaultRaftAddress),
flagDebugAddr.StringFlag(false, metarepos.DefaultDebugAddress),
flagReplicationFactor.IntFlag(false, metarepos.DefaultLogReplicationFactor),
flagReplicationFactor,
flagRaftProposeTimeout.DurationFlag(false, metarepos.DefaultProposeTimeout),
flagRPCTimeout.DurationFlag(false, metarepos.DefaultRPCTimeout),
flagCommitTick.DurationFlag(false, metarepos.DefaultCommitTick),
Expand Down
4 changes: 2 additions & 2 deletions internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (adm *Admin) Serve() error {
if ce := adm.logger.Check(zap.InfoLevel, "starting"); ce != nil {
ce.Write(
zap.String("address", adm.serverAddr),
zap.Uint("replicationFactor", adm.replicationFactor),
zap.Int("replicationFactor", adm.replicationFactor),
zap.String("replicationSelector", adm.snSelector.Name()),
zap.Duration("logStreamGCTimeout", adm.logStreamGCTimeout),
)
Expand Down Expand Up @@ -695,7 +695,7 @@ func (adm *Admin) addLogStreamInternal(ctx context.Context, tpid types.TopicID,
func (adm *Admin) verifyLogStream(clusmeta *varlogpb.MetadataDescriptor, lsdesc *varlogpb.LogStreamDescriptor) error {
replicas := lsdesc.GetReplicas()
// the number of logstream replica
if uint(len(replicas)) != adm.replicationFactor {
if len(replicas) != adm.replicationFactor {
return status.Errorf(codes.FailedPrecondition, "add log stream: invalid number of log stream replicas: expected %d, actual %d", adm.replicationFactor, len(replicas))
}
// storagenode existence
Expand Down
7 changes: 4 additions & 3 deletions internal/admin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ import (
"github.com/kakao/varlog/internal/admin/snmanager"
"github.com/kakao/varlog/internal/admin/snwatcher"
"github.com/kakao/varlog/internal/admin/stats"
"github.com/kakao/varlog/internal/flags"
"github.com/kakao/varlog/pkg/types"
)

const (
DefaultClusterID = types.ClusterID(1)
DefaultListenAddress = "127.0.0.1:9090"
DefaultReplicationFactor = 1
DefaultReplicationFactor = flags.DefaultReplicationFactor
DefaultLogStreamGCTimeout = 24 * time.Hour
)

type config struct {
cid types.ClusterID
listenAddress string
replicationFactor uint
replicationFactor int
logStreamGCTimeout time.Duration
disableAutoLogStreamSync bool
enableAutoUnseal bool
Expand Down Expand Up @@ -123,7 +124,7 @@ func WithListenAddress(listen string) Option {
})
}

func WithReplicationFactor(replicationFactor uint) Option {
func WithReplicationFactor(replicationFactor int) Option {
return newFuncOption(func(cfg *config) {
cfg.replicationFactor = replicationFactor
})
Expand Down
28 changes: 28 additions & 0 deletions internal/flags/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package flags

import (
"fmt"

"github.com/urfave/cli/v2"
)

const (
CategoryCluster = "Cluster:"

DefaultReplicationFactor = 1
)

var (
ReplicationFactor = &cli.IntFlag{
Name: "replication-factor",
Category: CategoryCluster,
EnvVars: []string{"REPLICATION_FACTOR"},
Value: DefaultReplicationFactor,
Action: func(_ *cli.Context, value int) error {
if value <= 0 {
return fmt.Errorf("invalid value \"%d\" for flag --replication-factor", value)
}
return nil
},
}
)
52 changes: 52 additions & 0 deletions internal/flags/flags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package flags

import (
"io"
"testing"

"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
)

func TestReplicationFactor(t *testing.T) {
tcs := []struct {
name string
args []string
ok bool
}{
{
name: "replication_factor=1",
args: []string{"test", "--replication-factor=1"},
ok: true,
},
{
name: "replication_factor=0",
args: []string{"test", "--replication-factor=0"},
ok: false,
},
{
name: "replication_factor=-1",
args: []string{"test", "--replication-factor=-1"},
ok: false,
},
}

for _, tc := range tcs {
tc := tc
t.Run(tc.name, func(t *testing.T) {
app := &cli.App{
Name: "test",
Flags: []cli.Flag{
ReplicationFactor,
},
Writer: io.Discard,
}
err := app.Run(tc.args)
if !tc.ok {
require.Error(t, err)
return
}
require.NoError(t, err)
})
}
}
3 changes: 2 additions & 1 deletion internal/metarepos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/kakao/varlog/internal/flags"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/netutil"
)
Expand All @@ -23,7 +24,7 @@ const (
DefaultSnapshotCatchUpCount uint64 = 10000
DefaultSnapshotPurgeCount uint = 10
DefaultWalPurgeCount uint = 10
DefaultLogReplicationFactor int = 1
DefaultLogReplicationFactor int = flags.DefaultReplicationFactor
DefaultProposeTimeout = 100 * time.Millisecond
DefaultRaftTick = 100 * time.Millisecond
DefaultRPCTimeout = 100 * time.Millisecond
Expand Down
2 changes: 1 addition & 1 deletion tests/ee/cluster/local/metarepos/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (node *Node) arguments(t *testing.T) []string {
"--cluster-id", node.cid.String(),
"--raft-address", node.raftURL,
"--bind", node.rpcAddr,
"--log-rep-factor", strconv.Itoa(node.replicationFactor),
"--replication-factor", strconv.Itoa(node.replicationFactor),
"--raft-dir", node.raftDir,
"--log-dir", node.logDir,
}
Expand Down
2 changes: 1 addition & 1 deletion tests/it/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ func (clus *VarlogCluster) initVMS(t *testing.T) {
opts := append(clus.VMSOpts,
admin.WithListenAddress(listenAddress),
admin.WithClusterID(clus.clusterID),
admin.WithReplicationFactor(uint(clus.nrRep)),
admin.WithReplicationFactor(clus.nrRep),
admin.WithLogger(clus.logger.Named("admin")),
admin.WithMetadataRepositoryManager(mrMgr),
admin.WithStorageNodeManager(snMgr),
Expand Down

0 comments on commit b032afa

Please sign in to comment.