Skip to content

Commit

Permalink
backport of commit ce74f4f (#12166)
Browse files Browse the repository at this point in the history
Add some metrics helpful for monitoring raft cluster state.

Furthermore, we weren't emitting bolt metrics on regular (non-perf) standbys, and there were other metrics
in metricsLoop that would make sense to include in OSS but weren't.  We now have an active-node-only func,
emitMetricsActiveNode.  This runs metricsLoop on the active node.  Standbys and perf-standbys run metricsLoop
from a goroutine managed by the runStandby rungroup.
  • Loading branch information
ncabatoff authored and miagilepner committed Jun 15, 2023
1 parent ae23611 commit deed75b
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 24 deletions.
3 changes: 3 additions & 0 deletions changelog/12166.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
storage/raft: add additional raft metrics relating to applied index and heartbeating; also ensure OSS standbys emit periodic metrics.
```
2 changes: 2 additions & 0 deletions physical/raft/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func NewFSM(path string, localID string, logger log.Logger) (*FSM, error) {
})

dbPath := filepath.Join(path, databaseFilename)
f.l.Lock()
defer f.l.Unlock()
if err := f.openDBFile(dbPath); err != nil {
return nil, fmt.Errorf("failed to open bolt file: %w", err)
}
Expand Down
13 changes: 13 additions & 0 deletions physical/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,22 @@ func (b *RaftBackend) CollectMetrics(sink *metricsutil.ClusterMetricSink) {
b.l.RLock()
logstoreStats := b.stableStore.(*raftboltdb.BoltStore).Stats()
fsmStats := b.fsm.Stats()
stats := b.raft.Stats()
b.l.RUnlock()
b.collectMetricsWithStats(logstoreStats, sink, "logstore")
b.collectMetricsWithStats(fsmStats, sink, "fsm")
labels := []metrics.Label{
{
Name: "peer_id",
Value: b.localID,
},
}
for _, key := range []string{"term", "commit_index", "applied_index", "fsm_pending"} {
n, err := strconv.ParseUint(stats[key], 10, 64)
if err == nil {
sink.SetGaugeWithLabels([]string{"raft_storage", "stats", key}, float32(n), labels)
}
}
}

func (b *RaftBackend) collectMetricsWithStats(stats bolt.Stats, sink *metricsutil.ClusterMetricSink, database string) {
Expand Down
22 changes: 18 additions & 4 deletions physical/raft/raft_autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,25 @@ func (b *RaftBackend) startFollowerHeartbeatTracker() {
tickerCh := b.followerHeartbeatTicker.C
b.l.RUnlock()

followerGauge := func(peerID string, suffix string, value float32) {
labels := []metrics.Label{
{
Name: "peer_id",
Value: peerID,
},
}
metrics.SetGaugeWithLabels([]string{"raft_storage", "follower", suffix}, value, labels)
}
for range tickerCh {
b.l.RLock()
if b.autopilotConfig.CleanupDeadServers && b.autopilotConfig.DeadServerLastContactThreshold != 0 {
b.followerStates.l.RLock()
for _, state := range b.followerStates.followers {
b.followerStates.l.RLock()
myAppliedIndex := b.raft.AppliedIndex()
for peerID, state := range b.followerStates.followers {
timeSinceLastHeartbeat := time.Now().Sub(state.LastHeartbeat) / time.Millisecond
followerGauge(peerID, "last_heartbeat_ms", float32(timeSinceLastHeartbeat))
followerGauge(peerID, "applied_index_delta", float32(myAppliedIndex-state.AppliedIndex))

if b.autopilotConfig.CleanupDeadServers && b.autopilotConfig.DeadServerLastContactThreshold != 0 {
if state.LastHeartbeat.IsZero() || state.IsDead.Load() {
continue
}
Expand All @@ -535,8 +549,8 @@ func (b *RaftBackend) startFollowerHeartbeatTracker() {
state.IsDead.Store(true)
}
}
b.followerStates.l.RUnlock()
}
b.followerStates.l.RUnlock()
b.l.RUnlock()
}
}
Expand Down
6 changes: 3 additions & 3 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2239,6 +2239,9 @@ func (s standardUnsealStrategy) unseal(ctx context.Context, logger log.Logger, c
return err
}

c.metricsCh = make(chan struct{})
go c.emitMetricsActiveNode(c.metricsCh)

return nil
}

Expand Down Expand Up @@ -2297,9 +2300,6 @@ func (c *Core) postUnseal(ctx context.Context, ctxCancelFunc context.CancelFunc,
seal.StartHealthCheck()
}

c.metricsCh = make(chan struct{})
go c.emitMetrics(c.metricsCh)

// This is intentionally the last block in this function. We want to allow
// writes just before allowing client requests, to ensure everything has
// been set up properly before any writes can have happened.
Expand Down
23 changes: 10 additions & 13 deletions vault/core_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,16 @@ func (c *Core) metricsLoop(stopCh chan struct{}) {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "dr", "secondary"}, 0, nil)
}

// If we're using a raft backend, emit raft metrics
if rb, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
rb.CollectMetrics(c.MetricSink())
}

// Capture the total number of in-flight requests
c.inFlightReqGaugeMetric()

// Refresh gauge metrics that are looped
c.cachedGaugeMetricsEmitter()

// If we're using a raft backend, emit boltdb metrics
if rb, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
rb.CollectMetrics(c.MetricSink())
}
case <-writeTimer:
if stopped := grabLockOrStop(c.stateLock.RLock, c.stateLock.RUnlock, stopCh); stopped {
return
Expand Down Expand Up @@ -223,15 +223,12 @@ func (c *Core) tokenGaugeTtlCollector(ctx context.Context) ([]metricsutil.GaugeL
return ts.gaugeCollectorByTtl(ctx)
}

// emitMetrics is used to start all the periodc metrics; all of them should
// be shut down when stopCh is closed.
func (c *Core) emitMetrics(stopCh chan struct{}) {
// emitMetricsActiveNode is used to start all the periodic metrics; all of them should
// be shut down when stopCh is closed. This code runs on the active node only.
func (c *Core) emitMetricsActiveNode(stopCh chan struct{}) {
// The gauge collection processes are started and stopped here
// because there's more than one TokenManager created during startup,
// but we only want one set of gauges.
//
// Both active nodes and performance standby nodes call emitMetrics
// so we have to handle both.
metricsInit := []struct {
MetricName []string
MetadataLabel []metrics.Label
Expand Down Expand Up @@ -340,8 +337,8 @@ func (c *Core) findKvMounts() []*kvMount {
c.mountsLock.RLock()
defer c.mountsLock.RUnlock()

// emitMetrics doesn't grab the statelock, so this code might run during or after the seal process.
// Therefore, we need to check if c.mounts is nil. If we do not, emitMetrics will panic if this is
// we don't grab the statelock, so this code might run during or after the seal process.
// Therefore, we need to check if c.mounts is nil. If we do not, this will panic when
// run after seal.
if c.mounts == nil {
return mounts
Expand Down
11 changes: 11 additions & 0 deletions vault/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,17 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, stopCh chan struct{}) {
c.logger.Debug("shutting down periodic leader refresh")
})
}
{
metricsStop := make(chan struct{})

g.Add(func() error {
c.metricsLoop(metricsStop)
return nil
}, func(error) {
close(metricsStop)
c.logger.Debug("shutting down periodic metrics")
})
}
{
// Wait for leadership
leaderStopCh := make(chan struct{})
Expand Down
6 changes: 6 additions & 0 deletions vault/request_forwarding_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync/atomic"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/vault/helper/forwarding"
"github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/sdk/helper/consts"
Expand Down Expand Up @@ -136,6 +137,9 @@ func (c *forwardingClient) startHeartbeat() {
Mode: "standby",
}
tick := func() {
labels := make([]metrics.Label, 0, 1)
defer metrics.MeasureSinceWithLabels([]string{"ha", "rpc", "client", "echo"}, time.Now(), labels)

req := &EchoRequest{
Message: "ping",
ClusterAddr: clusterAddr,
Expand All @@ -150,12 +154,14 @@ func (c *forwardingClient) startHeartbeat() {
req.RaftDesiredSuffrage = raftBackend.DesiredSuffrage()
req.RaftRedundancyZone = raftBackend.RedundancyZone()
req.RaftUpgradeVersion = raftBackend.EffectiveVersion()
labels = append(labels, metrics.Label{Name: "peer_id", Value: raftBackend.NodeID()})
}

ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second)
resp, err := c.RequestForwardingClient.Echo(ctx, req)
cancel()
if err != nil {
metrics.IncrCounter([]string{"ha", "rpc", "client", "echo", "errors"}, 1)
c.core.logger.Debug("forwarding: error sending echo request to active node", "error", err)
return
}
Expand Down
15 changes: 11 additions & 4 deletions website/content/docs/internals/telemetry.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,12 @@ These metrics relate to internal operations on Merkle Trees and Write Ahead Logs

These metrics are emitted on standbys when talking to the active node, and in some cases by performance standbys as well.

| Metric | Description | Unit | Type |
| :----------------------------------- | :---------------------------------------------------------------- | :----- | :------ |
| `vault.ha.rpc.client.forward` | Time taken to forward a request from a standby to the active node | ms | summary |
| `vault.ha.rpc.client.forward.errors` | Number of standby requests forwarding failures | errors | counter |
| Metric | Description | Unit | Type |
| :----------------------------------- | :------------------------------------------------------------------- | :----- | :------ |
| `vault.ha.rpc.client.forward` | Time taken to forward a request from a standby to the active node | ms | summary |
| `vault.ha.rpc.client.forward.errors` | Number of standby requests forwarding failures | errors | counter |
| `vault.ha.rpc.client.echo` | Time taken to send an echo request from a standby to the active node | ms | summary |
| `vault.ha.rpc.client.echo.errors` | Number of standby echo request failures | errors | counter |

## Replication Metrics

Expand Down Expand Up @@ -470,6 +472,11 @@ These metrics relate to raft based [integrated storage][integrated-storage].
| `vault.raft_storage.bolt.spill.time` | Time taken spilling. | ms | summary |
| `vault.raft_storage.bolt.write.count` | Number of writes performed. | writes | gauge |
| `vault.raft_storage.bolt.write.time` | Time taken writing to disk. | ms | summary |
| `vault.raft_storage.stats.commit_index` | Index of last raft log committed to disk on this node. | sequence number | gauge |
| `vault.raft_storage.stats.applied_index` | Highest index of raft log either applied to the FSM or added to fsm_pending queue. | sequence number | gauge |
| `vault.raft_storage.stats.fsm_pending` | Number of raft logs this node has queued to be applied by the FSM. | logs | gauge |
| `vault.raft_storage.follower.applied_index_delta` | Delta between leader applied index and each follower's applied index reported by echoes. | logs | gauge |
| `vault.raft_storage.follower.last_heartbeat_ms` | Time since last echo request received by each follower. | ms | gauge |

## Integrated Storage (Raft) Autopilot

Expand Down

0 comments on commit deed75b

Please sign in to comment.