Skip to content

Commit

Permalink
fix(admin): refresh cluster metadata while adding storage node
Browse files Browse the repository at this point in the history
It fixes a transient error caused by fetching cluster metadata too lately while handling
AddStorageNode. This PR lets the AddStorageNode RPC handler fetch the cluster metadata forcibly and
wait for them to be updated. Although it isn't a critical error and can be resolved through a simple
retry, it is resolved quickly by fetching the cluster metadata.

Error example:

```
--- FAIL: TestVarlogAppend (5.38s)
    controller.go:62:
        	Error Trace:	/home/runner/work/varlog/varlog/tests/ee/controller/controller.go:62
        	            				/home/runner/work/varlog/varlog/tests/ee/cluster/local/cluster.go:238
        	            				/home/runner/work/varlog/varlog/tests/ee/cluster/local/cluster.go:63
        	            				/home/runner/work/varlog/varlog/tests/ee/ee_test.go:32
        	Error:      	Received unexpected error:
        	            	exit status 255
        	Test:       	TestVarlogAppend
        	Messages:   	/home/runner/work/varlog/varlog/bin/varlogctl storagenode add --storage-node-id 1 --storage-node-address 127.0.0.1:20001 --admin 127.0.0.1:9093: rpc error: code = Unavailable desc = add storage node: call again
```
  • Loading branch information
ijsong committed Sep 25, 2023
1 parent b499ab0 commit a2e0326
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,33 @@ func (adm *Admin) addStorageNode(ctx context.Context, snid types.StorageNodeID,
return nil, status.Errorf(status.Code(err), "add storage node: %s", err.Error())
}

adm.snmgr.AddStorageNode(ctx, snmd.StorageNode.StorageNodeID, addr)
adm.statRepository.Report(ctx, snmd, now)
snm, ok := adm.statRepository.GetStorageNode(snid)
if !ok {
return nil, status.Error(codes.Unavailable, "add storage node: call again")
confirm := func() *admpb.StorageNodeMetadata {
adm.snmgr.AddStorageNode(ctx, snmd.StorageNode.StorageNodeID, addr)
_, _ = adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx) // Fetch new cluster metadata.
adm.statRepository.Report(ctx, snmd, now)
if snm, ok := adm.statRepository.GetStorageNode(snid); ok {
return snm
}
return nil
}

if snm := confirm(); snm != nil {
return snm, nil
}

const interval = 100 * time.Millisecond
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if snm := confirm(); snm != nil {
return snm, nil
}
case <-ctx.Done():
return nil, status.Error(codes.Unavailable, "add storage node: call again")
}
}
return snm, nil
}

func (adm *Admin) unregisterStorageNode(ctx context.Context, snid types.StorageNodeID) error {
Expand Down

0 comments on commit a2e0326

Please sign in to comment.