Skip to content

Commit

Permalink
feat(admin): check cluster id in mrmanager (#555)
Browse files Browse the repository at this point in the history
### What this PR does

This PR adds codes to check cluster id in mrmanager. Because the admin server doesn't have explicit health checking, comparing cluster id is performed in ClusterMetadataViewer.
In the next PR, checking the replication factor will be added.
  • Loading branch information
ijsong authored Oct 4, 2023
2 parents 52257ff + b958fe0 commit 7794a6d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
25 changes: 24 additions & 1 deletion internal/admin/mrmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/singleflight"

"github.com/kakao/varlog/pkg/mrc"
Expand Down Expand Up @@ -126,6 +127,7 @@ func New(ctx context.Context, opts ...Option) (MetadataRepositoryManager, error)
continue
}
return &mrManager{
config: cfg,
dirty: true,
connector: connector,
}, nil
Expand Down Expand Up @@ -375,11 +377,32 @@ func (mrm *mrManager) ClusterMetadata(ctx context.Context) (*varlogpb.MetadataDe
mrm.mu.RUnlock()

// slow path
return mrm.getClusterMetadataSlowly(ctx)
}

func (mrm *mrManager) getClusterMetadataSlowly(ctx context.Context) (*varlogpb.MetadataDescriptor, error) {
md, err, _ := mrm.sfg.Do("cluster_metadata", func() (interface{}, error) {
meta, err := mrm.clusterMetadata(ctx)
var (
meta *varlogpb.MetadataDescriptor
info *mrpb.ClusterInfo
)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() (err error) {
meta, err = mrm.clusterMetadata(ctx)
return err
})
eg.Go(func() (err error) {
info, err = mrm.GetClusterInfo(ctx)
return err
})
err := eg.Wait()
if err != nil {
return nil, fmt.Errorf("cluster metadata: %w", err)
}
if info.ClusterID != mrm.cid {
return nil, fmt.Errorf("unexpected cluster id: expected %v, got %v", mrm.cid, info.ClusterID)
}

mrm.mu.Lock()
mrm.meta = meta
mrm.dirty = false
Expand Down
1 change: 1 addition & 0 deletions tests/it/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,7 @@ func (clus *VarlogCluster) initVMS(t *testing.T) {
mrMgrOpts := append(clus.mrMgrOpts,
mrmanager.WithAddresses(clus.mrRPCEndpoints...),
mrmanager.WithLogger(clus.logger.Logger),
mrmanager.WithClusterID(clus.clusterID),
)
mrMgr, err := mrmanager.New(context.TODO(), mrMgrOpts...)
require.NoError(t, err)
Expand Down

0 comments on commit 7794a6d

Please sign in to comment.