Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more raft metrics, emit more metrics on non-perf standbys #12166

Merged
merged 11 commits into from
Oct 7, 2022
3 changes: 3 additions & 0 deletions changelog/12166.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
storage/raft: add additional raft metrics relating to applied index and heartbeating; also ensure OSS standbys emit periodic metrics.
```
13 changes: 13 additions & 0 deletions physical/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,22 @@ func (b *RaftBackend) CollectMetrics(sink *metricsutil.ClusterMetricSink) {
b.l.RLock()
logstoreStats := b.stableStore.(*raftboltdb.BoltStore).Stats()
fsmStats := b.fsm.db.Stats()
stats := b.raft.Stats()
b.l.RUnlock()
b.collectMetricsWithStats(logstoreStats, sink, "logstore")
b.collectMetricsWithStats(fsmStats, sink, "fsm")
labels := []metrics.Label{
{
Name: "peer_id",
Value: b.localID,
},
}
for _, key := range []string{"term", "commit_index", "applied_index", "fsm_pending"} {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small thing here. It's not super obvious, but the b.raft.Stats()["applied_index"] is actually the latest index that the raft library has queued to the FSM (it includes fsm_pending items), not what has actually been applied. Our heartbeating mechanism uses the actual last applied index we've seen in the FSM. If the intent is to compare this value between nodes you might find that it's more up-to-date then reality. If that's not the intent (which maybe not given the existence of the delta metric below), then we may just want to leave it as is and mention this fact in the docs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'll take the doc strategy. As you say, applied_index_delta can tell us how far behind followers are. IIRC here I was mostly just looking to expose potentially useful info from the raft lib for debugging.

n, err := strconv.ParseUint(stats[key], 10, 64)
if err == nil {
sink.SetGaugeWithLabels([]string{"raft_storage", "stats", key}, float32(n), labels)
}
}
}

func (b *RaftBackend) collectMetricsWithStats(stats bolt.Stats, sink *metricsutil.ClusterMetricSink, database string) {
Expand Down
28 changes: 21 additions & 7 deletions physical/raft/raft_autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ import (
"sync"
"time"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-secure-stdlib/parseutil"
"github.com/hashicorp/go-secure-stdlib/strutil"
"go.uber.org/atomic"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/raft"
autopilot "github.com/hashicorp/raft-autopilot"
"github.com/mitchellh/mapstructure"
"go.uber.org/atomic"
)

type CleanupDeadServersValue int
Expand Down Expand Up @@ -470,11 +469,26 @@ func (b *RaftBackend) startFollowerHeartbeatTracker() {
tickerCh := b.followerHeartbeatTicker.C
b.l.RUnlock()

followerGauge := func(peerID string, suffix string, value float32) {
labels := []metrics.Label{
{
Name: "peer_id",
Value: peerID,
},
}
metrics.SetGaugeWithLabels([]string{"raft_storage", "follower", suffix}, value, labels)
}
for range tickerCh {
b.l.RLock()
if b.autopilotConfig.CleanupDeadServers && b.autopilotConfig.DeadServerLastContactThreshold != 0 {
b.followerStates.l.RLock()
for _, state := range b.followerStates.followers {
b.followerStates.l.RLock()
myAppliedIndex := b.raft.AppliedIndex()
for peerID, state := range b.followerStates.followers {
timeSinceLastHeartbeat := time.Now().Sub(state.LastHeartbeat) / time.Millisecond
b.logger.Trace("follower metrics", "peerID", peerID, "myAppliedIndex", myAppliedIndex, "appliedIndex", state.AppliedIndex, "lastHeartbeat", state.LastHeartbeat)
followerGauge(peerID, "last_heartbeat_ms", float32(timeSinceLastHeartbeat))
followerGauge(peerID, "applied_index_delta", float32(myAppliedIndex-state.AppliedIndex))

if b.autopilotConfig.CleanupDeadServers && b.autopilotConfig.DeadServerLastContactThreshold != 0 {
if state.LastHeartbeat.IsZero() || state.IsDead.Load() {
continue
}
Expand All @@ -483,8 +497,8 @@ func (b *RaftBackend) startFollowerHeartbeatTracker() {
state.IsDead.Store(true)
}
}
b.followerStates.l.RUnlock()
}
b.followerStates.l.RUnlock()
vishalnayak marked this conversation as resolved.
Show resolved Hide resolved
b.l.RUnlock()
}
}
Expand Down
6 changes: 3 additions & 3 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2054,6 +2054,9 @@ func (s standardUnsealStrategy) unseal(ctx context.Context, logger log.Logger, c
return err
}

c.metricsCh = make(chan struct{})
go c.emitMetricsActiveNode(c.metricsCh)
vishalnayak marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

Expand Down Expand Up @@ -2108,9 +2111,6 @@ func (c *Core) postUnseal(ctx context.Context, ctxCancelFunc context.CancelFunc,
}
}

c.metricsCh = make(chan struct{})
go c.emitMetrics(c.metricsCh)

// This is intentionally the last block in this function. We want to allow
// writes just before allowing client requests, to ensure everything has
// been set up properly before any writes can have happened.
Expand Down
33 changes: 18 additions & 15 deletions vault/core_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ func (c *Core) metricsLoop(stopCh chan struct{}) {
for {
select {
case <-emitTimer:
if !c.PerfStandby() {
if stopped := grabLockOrStop(c.stateLock.RLock, c.stateLock.RUnlock, stopCh); stopped {
// Go through the loop again, this time the stop channel case
// should trigger
continue
}
if !c.perfStandby {
c.metricsMutex.Lock()
// Emit on active node only
if c.expiration != nil {
Expand All @@ -55,13 +60,13 @@ func (c *Core) metricsLoop(stopCh chan struct{}) {
}

// Refresh the standby gauge, on all nodes
if standby, _ := c.Standby(); standby {
if c.standby {
c.metricSink.SetGaugeWithLabels([]string{"core", "active"}, 0, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "active"}, 1, nil)
}

if perfStandby := c.PerfStandby(); perfStandby {
if c.perfStandby {
c.metricSink.SetGaugeWithLabels([]string{"core", "performance_standby"}, 1, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "performance_standby"}, 0, nil)
Expand Down Expand Up @@ -91,13 +96,14 @@ func (c *Core) metricsLoop(stopCh chan struct{}) {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "dr", "secondary"}, 0, nil)
}

// Refresh gauge metrics that are looped
c.cachedGaugeMetricsEmitter()

// If we're using a raft backend, emit boltdb metrics
// If we're using a raft backend, emit raft metrics
if rb, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
rb.CollectMetrics(c.MetricSink())
}

// Refresh gauge metrics that are looped
c.cachedGaugeMetricsEmitter()
c.stateLock.RUnlock()
case <-writeTimer:
if stopped := grabLockOrStop(c.stateLock.RLock, c.stateLock.RUnlock, stopCh); stopped {
// Go through the loop again, this time the stop channel case
Expand Down Expand Up @@ -187,15 +193,12 @@ func (c *Core) tokenGaugeTtlCollector(ctx context.Context) ([]metricsutil.GaugeL
return ts.gaugeCollectorByTtl(ctx)
}

// emitMetrics is used to start all the periodc metrics; all of them should
// be shut down when stopCh is closed.
func (c *Core) emitMetrics(stopCh chan struct{}) {
// emitMetricsActiveNode is used to start all the periodic metrics; all of them should
// be shut down when stopCh is closed. This code runs on the active node only.
func (c *Core) emitMetricsActiveNode(stopCh chan struct{}) {
// The gauge collection processes are started and stopped here
// because there's more than one TokenManager created during startup,
// but we only want one set of gauges.
//
// Both active nodes and performance standby nodes call emitMetrics
// so we have to handle both.
metricsInit := []struct {
MetricName []string
MetadataLabel []metrics.Label
Expand Down Expand Up @@ -304,8 +307,8 @@ func (c *Core) findKvMounts() []*kvMount {
c.mountsLock.RLock()
defer c.mountsLock.RUnlock()

// emitMetrics doesn't grab the statelock, so this code might run during or after the seal process.
// Therefore, we need to check if c.mounts is nil. If we do not, emitMetrics will panic if this is
// we don't grab the statelock, so this code might run during or after the seal process.
// Therefore, we need to check if c.mounts is nil. If we do not, this will panic when
// run after seal.
if c.mounts == nil {
return mounts
Expand Down
11 changes: 11 additions & 0 deletions vault/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,17 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, stopCh chan struct{}) {
c.logger.Debug("shutting down periodic leader refresh")
})
}
{
metricsStop := make(chan struct{})

g.Add(func() error {
c.metricsLoop(metricsStop)
return nil
}, func(error) {
close(metricsStop)
c.logger.Debug("shutting down periodic metrics")
})
}
{
// Wait for leadership
leaderStopCh := make(chan struct{})
Expand Down
8 changes: 6 additions & 2 deletions vault/request_forwarding_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"sync/atomic"
"time"

"github.com/hashicorp/vault/sdk/helper/consts"

"github.com/armon/go-metrics"
"github.com/hashicorp/vault/helper/forwarding"
"github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/vault/replication"
)

Expand Down Expand Up @@ -105,6 +105,8 @@ type forwardingClient struct {
func (c *forwardingClient) startHeartbeat() {
go func() {
tick := func() {
labels := []metrics.Label{}
defer metrics.MeasureSinceWithLabels([]string{"ha", "rpc", "client", "echo"}, time.Now(), labels)
clusterAddr := c.core.ClusterAddr()

req := &EchoRequest{
Expand All @@ -117,12 +119,14 @@ func (c *forwardingClient) startHeartbeat() {
req.RaftNodeID = raftBackend.NodeID()
req.RaftTerm = raftBackend.Term()
req.RaftDesiredSuffrage = raftBackend.DesiredSuffrage()
labels = append(labels, metrics.Label{Name: "peer_id", Value: raftBackend.NodeID()})
}

ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second)
resp, err := c.RequestForwardingClient.Echo(ctx, req)
cancel()
if err != nil {
metrics.IncrCounter([]string{"ha", "rpc", "client", "echo", "errors"}, 1)
c.core.logger.Debug("forwarding: error sending echo request to active node", "error", err)
return
}
Expand Down