Skip to content

Commit

Permalink
fix(admin): refresh cluster metadata while adding storage node (#558)
Browse files Browse the repository at this point in the history
### What this PR does

It fixes a transient error caused by fetching cluster metadata too lately while handling AddStorageNode. This PR lets the AddStorageNode RPC handler fetch the cluster metadata forcibly and wait for them to be updated. Although it isn't a critical error and can be resolved through a simple retry, it is resolved quickly by fetching the cluster metadata.

Error example:

```
--- FAIL: TestVarlogAppend (5.38s)
    controller.go:62:
                Error Trace:    /home/runner/work/varlog/varlog/tests/ee/controller/controller.go:62
                                                        /home/runner/work/varlog/varlog/tests/ee/cluster/local/cluster.go:238
                                                        /home/runner/work/varlog/varlog/tests/ee/cluster/local/cluster.go:63
                                                        /home/runner/work/varlog/varlog/tests/ee/ee_test.go:32
                Error:          Received unexpected error:
                                exit status 255
                Test:           TestVarlogAppend
                Messages:       /home/runner/work/varlog/varlog/bin/varlogctl storagenode add --storage-node-id 1 --storage-node-address 127.0.0.1:20001 --admin 127.0.0.1:9093: rpc error: code = Unavailable desc = add storage node: call again
```
  • Loading branch information
ijsong authored Oct 4, 2023
2 parents 11859e0 + a2e0326 commit 4cb06f1
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,33 @@ func (adm *Admin) addStorageNode(ctx context.Context, snid types.StorageNodeID,
return nil, status.Errorf(status.Code(err), "add storage node: %s", err.Error())
}

adm.snmgr.AddStorageNode(ctx, snmd.StorageNode.StorageNodeID, addr)
adm.statRepository.Report(ctx, snmd, now)
snm, ok := adm.statRepository.GetStorageNode(snid)
if !ok {
return nil, status.Error(codes.Unavailable, "add storage node: call again")
confirm := func() *admpb.StorageNodeMetadata {
adm.snmgr.AddStorageNode(ctx, snmd.StorageNode.StorageNodeID, addr)
_, _ = adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx) // Fetch new cluster metadata.
adm.statRepository.Report(ctx, snmd, now)
if snm, ok := adm.statRepository.GetStorageNode(snid); ok {
return snm
}
return nil
}

if snm := confirm(); snm != nil {
return snm, nil
}

const interval = 100 * time.Millisecond
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if snm := confirm(); snm != nil {
return snm, nil
}
case <-ctx.Done():
return nil, status.Error(codes.Unavailable, "add storage node: call again")
}
}
return snm, nil
}

func (adm *Admin) unregisterStorageNode(ctx context.Context, snid types.StorageNodeID) error {
Expand Down

0 comments on commit 4cb06f1

Please sign in to comment.