diff --git a/changelog/12166.txt b/changelog/12166.txt new file mode 100644 index 000000000000..9cec76cbafee --- /dev/null +++ b/changelog/12166.txt @@ -0,0 +1,3 @@ +```release-note:improvement +storage/raft: add additional raft metrics relating to applied index and heartbeating; also ensure OSS standbys emit periodic metrics. +``` diff --git a/physical/raft/fsm.go b/physical/raft/fsm.go index 8d5b5524db12..f8ca9c6546f0 100644 --- a/physical/raft/fsm.go +++ b/physical/raft/fsm.go @@ -136,6 +136,8 @@ func NewFSM(path string, localID string, logger log.Logger) (*FSM, error) { }) dbPath := filepath.Join(path, databaseFilename) + f.l.Lock() + defer f.l.Unlock() if err := f.openDBFile(dbPath); err != nil { return nil, fmt.Errorf("failed to open bolt file: %w", err) } diff --git a/physical/raft/raft.go b/physical/raft/raft.go index 98d51c05d96d..7400794f142d 100644 --- a/physical/raft/raft.go +++ b/physical/raft/raft.go @@ -581,9 +581,22 @@ func (b *RaftBackend) CollectMetrics(sink *metricsutil.ClusterMetricSink) { b.l.RLock() logstoreStats := b.stableStore.(*raftboltdb.BoltStore).Stats() fsmStats := b.fsm.db.Stats() + stats := b.raft.Stats() b.l.RUnlock() b.collectMetricsWithStats(logstoreStats, sink, "logstore") b.collectMetricsWithStats(fsmStats, sink, "fsm") + labels := []metrics.Label{ + { + Name: "peer_id", + Value: b.localID, + }, + } + for _, key := range []string{"term", "commit_index", "applied_index", "fsm_pending"} { + n, err := strconv.ParseUint(stats[key], 10, 64) + if err == nil { + sink.SetGaugeWithLabels([]string{"raft_storage", "stats", key}, float32(n), labels) + } + } } func (b *RaftBackend) collectMetricsWithStats(stats bolt.Stats, sink *metricsutil.ClusterMetricSink, database string) { diff --git a/physical/raft/raft_autopilot.go b/physical/raft/raft_autopilot.go index eaa75dfa19a8..5596bbf4255f 100644 --- a/physical/raft/raft_autopilot.go +++ b/physical/raft/raft_autopilot.go @@ -540,11 +540,25 @@ func (b *RaftBackend) startFollowerHeartbeatTracker() { tickerCh := b.followerHeartbeatTicker.C b.l.RUnlock() + followerGauge := func(peerID string, suffix string, value float32) { + labels := []metrics.Label{ + { + Name: "peer_id", + Value: peerID, + }, + } + metrics.SetGaugeWithLabels([]string{"raft_storage", "follower", suffix}, value, labels) + } for range tickerCh { b.l.RLock() - if b.autopilotConfig.CleanupDeadServers && b.autopilotConfig.DeadServerLastContactThreshold != 0 { - b.followerStates.l.RLock() - for _, state := range b.followerStates.followers { + b.followerStates.l.RLock() + myAppliedIndex := b.raft.AppliedIndex() + for peerID, state := range b.followerStates.followers { + timeSinceLastHeartbeat := time.Now().Sub(state.LastHeartbeat) / time.Millisecond + followerGauge(peerID, "last_heartbeat_ms", float32(timeSinceLastHeartbeat)) + followerGauge(peerID, "applied_index_delta", float32(myAppliedIndex-state.AppliedIndex)) + + if b.autopilotConfig.CleanupDeadServers && b.autopilotConfig.DeadServerLastContactThreshold != 0 { if state.LastHeartbeat.IsZero() || state.IsDead.Load() { continue } @@ -553,8 +567,8 @@ func (b *RaftBackend) startFollowerHeartbeatTracker() { state.IsDead.Store(true) } } - b.followerStates.l.RUnlock() } + b.followerStates.l.RUnlock() b.l.RUnlock() } } diff --git a/vault/core.go b/vault/core.go index 7a4fd8a8f193..cbe526407088 100644 --- a/vault/core.go +++ b/vault/core.go @@ -2252,6 +2252,9 @@ func (s standardUnsealStrategy) unseal(ctx context.Context, logger log.Logger, c return err } + c.metricsCh = make(chan struct{}) + go c.emitMetricsActiveNode(c.metricsCh) + return nil } @@ -2310,9 +2313,6 @@ func (c *Core) postUnseal(ctx context.Context, ctxCancelFunc context.CancelFunc, seal.StartHealthCheck() } - c.metricsCh = make(chan struct{}) - go c.emitMetrics(c.metricsCh) - // This is intentionally the last block in this function. We want to allow // writes just before allowing client requests, to ensure everything has // been set up properly before any writes can have happened. diff --git a/vault/core_metrics.go b/vault/core_metrics.go index cd570eff3b6d..c6e719fc1251 100644 --- a/vault/core_metrics.go +++ b/vault/core_metrics.go @@ -113,16 +113,16 @@ func (c *Core) metricsLoop(stopCh chan struct{}) { c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "dr", "secondary"}, 0, nil) } + // If we're using a raft backend, emit raft metrics + if rb, ok := c.underlyingPhysical.(*raft.RaftBackend); ok { + rb.CollectMetrics(c.MetricSink()) + } + // Capture the total number of in-flight requests c.inFlightReqGaugeMetric() // Refresh gauge metrics that are looped c.cachedGaugeMetricsEmitter() - - // If we're using a raft backend, emit boltdb metrics - if rb, ok := c.underlyingPhysical.(*raft.RaftBackend); ok { - rb.CollectMetrics(c.MetricSink()) - } case <-writeTimer: l := newLockGrabber(c.stateLock.RLock, c.stateLock.RUnlock, stopCh) go l.grab() @@ -232,15 +232,12 @@ func (c *Core) tokenGaugeTtlCollector(ctx context.Context) ([]metricsutil.GaugeL return ts.gaugeCollectorByTtl(ctx) } -// emitMetrics is used to start all the periodc metrics; all of them should -// be shut down when stopCh is closed. -func (c *Core) emitMetrics(stopCh chan struct{}) { +// emitMetricsActiveNode is used to start all the periodic metrics; all of them should +// be shut down when stopCh is closed. This code runs on the active node only. +func (c *Core) emitMetricsActiveNode(stopCh chan struct{}) { // The gauge collection processes are started and stopped here // because there's more than one TokenManager created during startup, // but we only want one set of gauges. - // - // Both active nodes and performance standby nodes call emitMetrics - // so we have to handle both. metricsInit := []struct { MetricName []string MetadataLabel []metrics.Label @@ -349,8 +346,8 @@ func (c *Core) findKvMounts() []*kvMount { c.mountsLock.RLock() defer c.mountsLock.RUnlock() - // emitMetrics doesn't grab the statelock, so this code might run during or after the seal process. - // Therefore, we need to check if c.mounts is nil. If we do not, emitMetrics will panic if this is + // we don't grab the statelock, so this code might run during or after the seal process. + // Therefore, we need to check if c.mounts is nil. If we do not, this will panic when // run after seal. if c.mounts == nil { return mounts diff --git a/vault/ha.go b/vault/ha.go index 4f674dde9184..17b6e590d6b5 100644 --- a/vault/ha.go +++ b/vault/ha.go @@ -434,6 +434,17 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, stopCh chan struct{}) { c.logger.Debug("shutting down periodic leader refresh") }) } + { + metricsStop := make(chan struct{}) + + g.Add(func() error { + c.metricsLoop(metricsStop) + return nil + }, func(error) { + close(metricsStop) + c.logger.Debug("shutting down periodic metrics") + }) + } { // Wait for leadership leaderStopCh := make(chan struct{}) diff --git a/vault/request_forwarding_rpc.go b/vault/request_forwarding_rpc.go index 6ae4cf56b74f..281d9192bba9 100644 --- a/vault/request_forwarding_rpc.go +++ b/vault/request_forwarding_rpc.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/armon/go-metrics" "github.com/hashicorp/vault/helper/forwarding" "github.com/hashicorp/vault/physical/raft" "github.com/hashicorp/vault/sdk/helper/consts" @@ -135,6 +136,9 @@ func (c *forwardingClient) startHeartbeat() { Mode: "standby", } tick := func() { + labels := make([]metrics.Label, 0, 1) + defer metrics.MeasureSinceWithLabels([]string{"ha", "rpc", "client", "echo"}, time.Now(), labels) + req := &EchoRequest{ Message: "ping", ClusterAddr: clusterAddr, @@ -149,12 +153,14 @@ func (c *forwardingClient) startHeartbeat() { req.RaftDesiredSuffrage = raftBackend.DesiredSuffrage() req.RaftRedundancyZone = raftBackend.RedundancyZone() req.RaftUpgradeVersion = raftBackend.EffectiveVersion() + labels = append(labels, metrics.Label{Name: "peer_id", Value: raftBackend.NodeID()}) } ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second) resp, err := c.RequestForwardingClient.Echo(ctx, req) cancel() if err != nil { + metrics.IncrCounter([]string{"ha", "rpc", "client", "echo", "errors"}, 1) c.core.logger.Debug("forwarding: error sending echo request to active node", "error", err) return } diff --git a/website/content/docs/internals/telemetry.mdx b/website/content/docs/internals/telemetry.mdx index 6633141ced18..2785a4281416 100644 --- a/website/content/docs/internals/telemetry.mdx +++ b/website/content/docs/internals/telemetry.mdx @@ -222,10 +222,12 @@ These metrics relate to internal operations on Merkle Trees and Write Ahead Logs These metrics are emitted on standbys when talking to the active node, and in some cases by performance standbys as well. -| Metric | Description | Unit | Type | -| :----------------------------------- | :---------------------------------------------------------------- | :----- | :------ | -| `vault.ha.rpc.client.forward` | Time taken to forward a request from a standby to the active node | ms | summary | -| `vault.ha.rpc.client.forward.errors` | Number of standby requests forwarding failures | errors | counter | +| Metric | Description | Unit | Type | +| :----------------------------------- | :------------------------------------------------------------------- | :----- | :------ | +| `vault.ha.rpc.client.forward` | Time taken to forward a request from a standby to the active node | ms | summary | +| `vault.ha.rpc.client.forward.errors` | Number of standby requests forwarding failures | errors | counter | +| `vault.ha.rpc.client.echo` | Time taken to send an echo request from a standby to the active node | ms | summary | +| `vault.ha.rpc.client.echo.errors` | Number of standby echo request failures | errors | counter | ## Replication Metrics @@ -474,6 +476,11 @@ These metrics relate to raft based [integrated storage][integrated-storage]. | `vault.raft_storage.bolt.spill.time` | Time taken spilling. | ms | summary | | `vault.raft_storage.bolt.write.count` | Number of writes performed. | writes | gauge | | `vault.raft_storage.bolt.write.time` | Time taken writing to disk. | ms | summary | +| `vault.raft_storage.stats.commit_index` | Index of last raft log committed to disk on this node. | sequence number | gauge | +| `vault.raft_storage.stats.applied_index` | Highest index of raft log either applied to the FSM or added to fsm_pending queue. | sequence number | gauge | +| `vault.raft_storage.stats.fsm_pending` | Number of raft logs this node has queued to be applied by the FSM. | logs | gauge | +| `vault.raft_storage.follower.applied_index_delta` | Delta between leader applied index and each follower's applied index reported by echoes. | logs | gauge | +| `vault.raft_storage.follower.last_heartbeat_ms` | Time since last echo request received by each follower. | ms | gauge | ## Integrated Storage (Raft) Autopilot