Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: improve lag metrics #610

Merged
merged 19 commits into from
Nov 26, 2024
70 changes: 43 additions & 27 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/pingcap/ticdc/logservice/logservicepb"
"github.com/prometheus/client_golang/prometheus"

"github.com/pingcap/ticdc/utils/chann"
"github.com/pingcap/ticdc/utils/dynstream"
Expand Down Expand Up @@ -169,6 +170,10 @@ type eventStore struct {

encoder *zstd.Encoder
decoder *zstd.Decoder

metricEventStoreDSAddPathNum prometheus.Gauge
metricEventStoreDSRemovePathNum prometheus.Gauge
metricEventStoreDSArrageStreamNum prometheus.Gauge
}

const (
Expand Down Expand Up @@ -243,6 +248,10 @@ func New(
gcManager: newGCManager(),
encoder: encoder,
decoder: decoder,

metricEventStoreDSAddPathNum: metrics.DynamicStreamAddPathNum.WithLabelValues("event-store"),
metricEventStoreDSRemovePathNum: metrics.DynamicStreamRemovePathNum.WithLabelValues("event-store"),
metricEventStoreDSArrageStreamNum: metrics.DynamicStreamArrangeStreamNum.WithLabelValues("event-store"),
}
// TODO: update pebble options
for i := 0; i < dbCount; i++ {
Expand Down Expand Up @@ -657,7 +666,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com
}

func (e *eventStore) updateMetrics(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(50 * time.Millisecond)
for {
select {
case <-ctx.Done():
Expand All @@ -669,35 +678,42 @@ func (e *eventStore) updateMetrics(ctx context.Context) error {
}

func (e *eventStore) updateMetricsOnce() {
currentTime := e.pdClock.CurrentTime()
currentPhyTs := oracle.GetPhysical(currentTime)
minResolvedTs := uint64(0)
e.dispatcherMeta.RLock()
for _, subscriptionStat := range e.dispatcherMeta.subscriptionStats {
// resolved ts lag
resolvedTs := subscriptionStat.resolvedTs.Load()
resolvedPhyTs := oracle.ExtractPhysical(resolvedTs)
resolvedLag := float64(currentPhyTs-resolvedPhyTs) / 1e3
metrics.EventStoreDispatcherResolvedTsLagHist.Observe(float64(resolvedLag))
if minResolvedTs == 0 || resolvedTs < minResolvedTs {
minResolvedTs = resolvedTs
}
// checkpoint ts lag
checkpointTs := subscriptionStat.checkpointTs.Load()
watermarkPhyTs := oracle.ExtractPhysical(checkpointTs)
watermarkLag := float64(currentPhyTs-watermarkPhyTs) / 1e3
metrics.EventStoreDispatcherWatermarkLagHist.Observe(float64(watermarkLag))
}
e.dispatcherMeta.RUnlock()
if minResolvedTs == 0 {
return
}
minResolvedPhyTs := oracle.ExtractPhysical(minResolvedTs)
eventStoreResolvedTsLag := float64(currentPhyTs-minResolvedPhyTs) / 1e3
metrics.EventStoreResolvedTsLagGauge.Set(eventStoreResolvedTsLag)
// currentTime := e.pdClock.CurrentTime()
// currentPhyTs := oracle.GetPhysical(currentTime)
// minResolvedTs := uint64(0)
// e.dispatcherMeta.RLock()
// for _, subscriptionStat := range e.dispatcherMeta.subscriptionStats {
// // resolved ts lag
// resolvedTs := subscriptionStat.resolvedTs.Load()
// resolvedPhyTs := oracle.ExtractPhysical(resolvedTs)
// resolvedLag := float64(currentPhyTs-resolvedPhyTs) / 1e3
// metrics.EventStoreDispatcherResolvedTsLagHist.Observe(float64(resolvedLag))
// if minResolvedTs == 0 || resolvedTs < minResolvedTs {
// minResolvedTs = resolvedTs
// }
// // checkpoint ts lag
// checkpointTs := subscriptionStat.checkpointTs.Load()
// watermarkPhyTs := oracle.ExtractPhysical(checkpointTs)
// watermarkLag := float64(currentPhyTs-watermarkPhyTs) / 1e3
// metrics.EventStoreDispatcherWatermarkLagHist.Observe(float64(watermarkLag))
// }
// e.dispatcherMeta.RUnlock()
// if minResolvedTs == 0 {
// return
// }
// minResolvedPhyTs := oracle.ExtractPhysical(minResolvedTs)
// eventStoreResolvedTsLag := float64(currentPhyTs-minResolvedPhyTs) / 1e3
// metrics.EventStoreResolvedTsLagGauge.Set(eventStoreResolvedTsLag)
dsMetrics := e.ds.GetMetrics()
if dsMetrics.MinHandledTS != 0 {
lag := float64(oracle.GetPhysical(time.Now())-oracle.ExtractPhysical(dsMetrics.MinHandledTS)) / 1e3
metrics.EventStoreResolvedTsLagGauge.Set(lag)
}
metricEventStoreDSChannelSize.Set(float64(dsMetrics.EventChanSize))
metricEventStoreDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen))
e.metricEventStoreDSAddPathNum.Set(float64(dsMetrics.AddPath))
e.metricEventStoreDSRemovePathNum.Set(float64(dsMetrics.RemovePath))
e.metricEventStoreDSArrageStreamNum.Set(float64(dsMetrics.ArrangeStream))
}

func (e *eventStore) writeEvents(db *pebble.DB, events []kvEvent) error {
Expand Down
6 changes: 4 additions & 2 deletions logservice/eventstore/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ func (h *eventsHandler) Handle(subStat *subscriptionStat, events ...kvEvent) boo

func (h *eventsHandler) GetSize(event kvEvent) int { return 0 }
func (h *eventsHandler) GetArea(path logpuller.SubscriptionID, dest *subscriptionStat) int { return 0 }
func (h *eventsHandler) GetTimestamp(event kvEvent) dynstream.Timestamp { return 0 }
func (h *eventsHandler) IsPaused(event kvEvent) bool { return false }
func (h *eventsHandler) GetTimestamp(event kvEvent) dynstream.Timestamp {
return dynstream.Timestamp(event.raw.CRTs)
}
func (h *eventsHandler) IsPaused(event kvEvent) bool { return false }

func (h *eventsHandler) GetType(event kvEvent) dynstream.EventType {
if event.raw.IsResolved() {
Expand Down
8 changes: 5 additions & 3 deletions pkg/eventservice/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func (h *dispatcherEventsHandler) GetArea(path common.DispatcherID, dest *eventB
}
return d.info.GetChangefeedID().ID()
}
func (h *dispatcherEventsHandler) GetTimestamp(event scanTask) dynstream.Timestamp { return 0 }
func (h *dispatcherEventsHandler) IsPaused(event scanTask) bool { return false }
func (h *dispatcherEventsHandler) OnDrop(event scanTask) {}
func (h *dispatcherEventsHandler) GetTimestamp(event scanTask) dynstream.Timestamp {
return dynstream.Timestamp(event.watermark.Load())
}
func (h *dispatcherEventsHandler) IsPaused(event scanTask) bool { return false }
func (h *dispatcherEventsHandler) OnDrop(event scanTask) {}
24 changes: 24 additions & 0 deletions pkg/metrics/ds.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,34 @@ var (
Subsystem: "dynamic_stream",
Name: "pending_queue_len",
}, []string{"component"})
DynamicStreamAddPathNum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "dynamic_stream",
Name: "add_path_num",
Help: "The number of add path command",
}, []string{"component"})
DynamicStreamRemovePathNum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "dynamic_stream",
Name: "remove_path_num",
Help: "The number of remove path command",
}, []string{"component"})
DynamicStreamArrangeStreamNum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "dynamic_stream",
Name: "arrange_stream_num",
Help: "The number of arrange stream command",
}, []string{"component"})
)

func InitDynamicStreamMetrics(registry *prometheus.Registry) {
registry.MustRegister(DynamicStreamMemoryUsage)
registry.MustRegister(DynamicStreamEventChanSize)
registry.MustRegister(DynamicStreamPendingQueueLen)
registry.MustRegister(DynamicStreamAddPathNum)
registry.MustRegister(DynamicStreamRemovePathNum)
registry.MustRegister(DynamicStreamArrangeStreamNum)
}
14 changes: 12 additions & 2 deletions utils/dynstream/parallel_dynamic_stream.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package dynstream

import "math"

// Use a hasher to select target stream for the path.
// It implements the DynamicStream interface.
type parallelDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]] struct {
Expand Down Expand Up @@ -69,9 +71,17 @@ func (s *parallelDynamicStream[A, P, T, D, H]) SetAreaSettings(area A, settings

func (s *parallelDynamicStream[A, P, T, D, H]) GetMetrics() Metrics {
metrics := Metrics{}
metrics.MinHandledTS = math.MaxUint64
for _, ds := range s.dynamicStreams {
metrics.EventChanSize += ds.GetMetrics().EventChanSize
metrics.PendingQueueLen += ds.GetMetrics().PendingQueueLen
subMetrics := ds.GetMetrics()
metrics.EventChanSize += subMetrics.EventChanSize
metrics.PendingQueueLen += subMetrics.PendingQueueLen
metrics.AddPath += subMetrics.AddPath
metrics.RemovePath += subMetrics.RemovePath
metrics.ArrangeStream += subMetrics.ArrangeStream
if subMetrics.MinHandledTS < metrics.MinHandledTS {
metrics.MinHandledTS = subMetrics.MinHandledTS
}
}
return metrics
}
Loading